Collect BeyondTrust Endpoint Privilege Management (EPM) logs

Supported in:

This document explains how to ingest BeyondTrust Endpoint Privilege Management (EPM) logs to Google Security Operations using two different approaches: EC2-based collection and AWS Lambda-based collection using Amazon S3. The parser focuses on transforming raw JSON log data from BeyondTrust Endpoint into a structured format conforming to the Chronicle UDM. It first initializes default values for various fields and then parses the JSON payload, subsequently mapping specific fields from the raw log into corresponding UDM fields within the event.idm.read_only_udm object.

Before you begin

Make sure you have the following prerequisites:

  • Google SecOps instance
  • Privileged access to BeyondTrust Endpoint Privilege Managementtenant or API
  • Privileged access to AWS(S3, IAM, Lambda/EC2, EventBridge)

Choose your integration method

You can choose between two integration methods:

  • Option 1: EC2-based collection: Uses an EC2 instance with scheduled scripts for log collection
  • Option 2: AWS Lambda-based collection: Uses serverless Lambda functions with EventBridge scheduling

Option 1: EC2-based Collection

Configure AWS IAM for Google SecOps ingestion

  1. Create a Userfollowing this user guide: Creating an IAM user .
  2. Select the created User.
  3. Select the Security Credentialstab.
  4. Click Create Access Keyin the Access Keyssection.
  5. Select Third-party serviceas Use case.
  6. Click Next.
  7. Optional: Add a description tag.
  8. Click Create access key.
  9. Click Download CSV fileto save the Access Keyand Secret Access Keyfor future reference.
  10. Click Done.
  11. Select the Permissionstab.
  12. Click Add permissionsin the Permissions policiessection.
  13. Select Add permissions.
  14. Select Attach policies directly.
  15. Search for and select the AmazonS3FullAccesspolicy.
  16. Click Next.
  17. Click Add permissions.

Configure BeyondTrust EPM for API access

  1. Sign in to the BeyondTrust Privilege Management web consoleas an administrator.
  2. Go to Configuration > Settings > API Settings.
  3. Click Create an API Account.
  4. Provide the following configuration details:
    • Name: Enter Google SecOps Collector .
    • API Access: Enable Audit (Read)and other scopes as required.
  5. Copy and save the Client IDand Client Secret.
  6. Copy your API base URL; it's typically https://<your-tenant>-services.pm.beyondtrustcloud.com (you'll use this as BPT_API_URL).

Create an AWS S3 Bucket

  1. Sign in to the AWS Management Console.
  2. Go to AWS Console > Services > S3 > Create bucket.
  3. Provide the following configuration details:
    • Bucket name: my-beyondtrust-logs .
    • Region: [your choice] > Create.

Create an IAM Role for EC2

  1. Sign in to the AWS Management Console.
  2. Go to AWS Console > Services > IAM > Roles > Create role.
  3. Provide the following configuration details:
    • Trusted entity: AWS service > EC2 > Next.
    • Attach permission: AmazonS3FullAccess(or a scoped policy to your bucket) > Next.
    • Role name: EC2-S3-BPT-Writer > Create role.

Launch and configure your EC2 Collector VM

  1. Sign in to the AWS Management Console.
  2. Go to Services.
  3. In the search bar, type EC2and select it.
  4. In the EC2 dashboard, click Instances.
  5. Click Launch instances.
  6. Provide the following configuration details:
    • Name: Enter BPT-Log-Collector .
    • AMI: Select Ubuntu Server 22.04 LTS.
    • Instance type: t3.micro(or larger), then click Next.
    • Network: Make sure the Networksetting is set to your default VPC.
    • IAM role: Select the EC2-S3-BPT-WriterIAM role from the menu.
    • Auto-assign Public IP: Enable (or make sure you can reach it using VPN) > Next.
    • Add Storage: Leave the default storage configuration (8 GiB), and then click Next.
    • Select Create a new security group.
    • Inbound rule: Click Add Rule.
    • Type: Select SSH.
    • Port: 22.
    • Source: your IP
    • Click Review and Launch.
    • Select or create a key pair.
    • Click Download Key Pair.
    • Save the downloaded PEM file. You will need this file to connect to your instance using SSH.
  7. Connect to your Virtual Machine (VM) using SSH.

Install collector prerequisites

  1. Run the following command:

     chmod  
     400 
      
    ~/Downloads/your-key.pem
    ssh  
    -i  
    ~/Downloads/your-key.pem  
    ubuntu@<EC2_PUBLIC_IP> 
    
  2. Update system and install dependencies:

      # Update OS 
    sudo  
    apt  
    update && 
    sudo  
    apt  
    upgrade  
    -y # Install Python, Git 
    sudo  
    apt  
    install  
    -y  
    python3  
    python3-venv  
    python3-pip  
    git # Create & activate virtualenv 
    python3  
    -m  
    venv  
    ~/bpt-venv source 
      
    ~/bpt-venv/bin/activate # Install libraries 
    pip  
    install  
    requests  
    boto3 
    
  3. Create a directory & state file:

     sudo  
    mkdir  
    -p  
    /var/lib/bpt-collector
    sudo  
    touch  
    /var/lib/bpt-collector/last_run.txt
    sudo  
    chown  
    ubuntu:ubuntu  
    /var/lib/bpt-collector/last_run.txt 
    
  4. Initialize it (for example, to 1 hour ago):

      echo 
      
     " 
     $( 
    date  
    -u  
    -d  
     '1 hour ago' 
      
    +%Y-%m-%dT%H:%M:%SZ ) 
     " 
     > 
    /var/lib/bpt-collector/last_run.txt 
    

Deploy the BeyondTrust EPM Collector Script

  1. Create a project folder:

     mkdir  
    ~/bpt-collector && 
     cd 
      
    ~/bpt-collector 
    
  2. Export the required environment variables (for example, in ~/.bashrc ):

      export 
      
     BPT_API_URL 
     = 
     "https://<your-tenant>-services.pm.beyondtrustcloud.com" 
     export 
      
     BPT_CLIENT_ID 
     = 
     "your-client-id" 
     export 
      
     BPT_CLIENT_SECRET 
     = 
     "your-client-secret" 
     export 
      
     S3_BUCKET 
     = 
     "my-beyondtrust-logs" 
     export 
      
     S3_PREFIX 
     = 
     "bpt/" 
     export 
      
     STATE_FILE 
     = 
     "/var/lib/bpt-collector/last_run.txt" 
     export 
      
     RECORD_SIZE 
     = 
     "1000" 
     
    
  3. Create collector_bpt.py and enter the following code:

      #!/usr/bin/env python3 
     import 
      
     os 
     , 
      
     sys 
     , 
      
     json 
     , 
      
     boto3 
     , 
      
     requests 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timezone 
     , 
     timedelta 
     # ── UTILS ────────────────────────────────────────────────────────────── 
     def 
      
     must_env 
     ( 
     var 
     ): 
     val 
     = 
     os 
     . 
     getenv 
     ( 
     var 
     ) 
     if 
     not 
     val 
     : 
     print 
     ( 
     f 
     "ERROR: environment variable 
     { 
     var 
     } 
     is required" 
     , 
     file 
     = 
     sys 
     . 
     stderr 
     ) 
     sys 
     . 
     exit 
     ( 
     1 
     ) 
     return 
     val 
     def 
      
     ensure_state_file 
     ( 
     path 
     ): 
     d 
     = 
     os 
     . 
     path 
     . 
     dirname 
     ( 
     path 
     ) 
     if 
     not 
     os 
     . 
     path 
     . 
     isdir 
     ( 
     d 
     ): 
     os 
     . 
     makedirs 
     ( 
     d 
     , 
     exist_ok 
     = 
     True 
     ) 
     if 
     not 
     os 
     . 
     path 
     . 
     isfile 
     ( 
     path 
     ): 
     ts 
     = 
     ( 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     - 
     timedelta 
     ( 
     hours 
     = 
     1 
     )) 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     ) 
     with 
     open 
     ( 
     path 
     , 
     "w" 
     ) 
     as 
     f 
     : 
     f 
     . 
     write 
     ( 
     ts 
     ) 
     # ── CONFIG ───────────────────────────────────────────────────────────── 
     BPT_API_URL 
     = 
     must_env 
     ( 
     "BPT_API_URL" 
     ) 
     # e.g., https://tenant-services.pm.beyondtrustcloud.com 
     CLIENT_ID 
     = 
     must_env 
     ( 
     "BPT_CLIENT_ID" 
     ) 
     CLIENT_SECRET 
     = 
     must_env 
     ( 
     "BPT_CLIENT_SECRET" 
     ) 
     S3_BUCKET 
     = 
     must_env 
     ( 
     "S3_BUCKET" 
     ) 
     S3_PREFIX 
     = 
     os 
     . 
     getenv 
     ( 
     "S3_PREFIX" 
     , 
     "" 
     ) 
     # e.g., "bpt/" 
     STATE_FILE 
     = 
     os 
     . 
     getenv 
     ( 
     "STATE_FILE" 
     , 
     "/var/lib/bpt-collector/last_run.txt" 
     ) 
     RECORD_SIZE 
     = 
     int 
     ( 
     os 
     . 
     getenv 
     ( 
     "RECORD_SIZE" 
     , 
     "1000" 
     )) 
     # ── END CONFIG ───────────────────────────────────────────────────────── 
     ensure_state_file 
     ( 
     STATE_FILE 
     ) 
     def 
      
     read_last_run 
     (): 
     with 
     open 
     ( 
     STATE_FILE 
     , 
     "r" 
     ) 
     as 
     f 
     : 
     ts 
     = 
     f 
     . 
     read 
     () 
     . 
     strip 
     () 
     return 
     datetime 
     . 
     fromisoformat 
     ( 
     ts 
     . 
     replace 
     ( 
     "Z" 
     , 
     "+00:00" 
     )) 
     def 
      
     write_last_run 
     ( 
     dt 
     ): 
     with 
     open 
     ( 
     STATE_FILE 
     , 
     "w" 
     ) 
     as 
     f 
     : 
     f 
     . 
     write 
     ( 
     dt 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     )) 
     def 
      
     get_oauth_token 
     (): 
      
     """ 
     Get OAuth2 token using client credentials flow 
     Scope: urn:management:api (for EPM Management API access) 
     """ 
     resp 
     = 
     requests 
     . 
     post 
     ( 
     f 
     " 
     { 
     BPT_API_URL 
     } 
     /oauth/connect/token" 
     , 
     headers 
     = 
     { 
     "Content-Type" 
     : 
     "application/x-www-form-urlencoded" 
     }, 
     data 
     = 
     { 
     "grant_type" 
     : 
     "client_credentials" 
     , 
     "client_id" 
     : 
     CLIENT_ID 
     , 
     "client_secret" 
     : 
     CLIENT_SECRET 
     , 
     "scope" 
     : 
     "urn:management:api" 
     } 
     ) 
     resp 
     . 
     raise_for_status 
     () 
     return 
     resp 
     . 
     json 
     ()[ 
     "access_token" 
     ] 
     def 
      
     extract_event_timestamp 
     ( 
     evt 
     ): 
      
     """ 
     Extract timestamp from event, prioritizing event.ingested field 
     """ 
     # Primary (documented) path: event.ingested 
     if 
     isinstance 
     ( 
     evt 
     , 
     dict 
     ) 
     and 
     isinstance 
     ( 
     evt 
     . 
     get 
     ( 
     "event" 
     ), 
     dict 
     ): 
     ts 
     = 
     evt 
     [ 
     "event" 
     ] 
     . 
     get 
     ( 
     "ingested" 
     ) 
     if 
     ts 
     : 
     return 
     ts 
     # Fallbacks for other timestamp fields 
     timestamp_fields 
     = 
     [ 
     "timestamp" 
     , 
     "eventTime" 
     , 
     "dateTime" 
     , 
     "whenOccurred" 
     , 
     "date" 
     , 
     "time" 
     ] 
     for 
     field 
     in 
     timestamp_fields 
     : 
     if 
     field 
     in 
     evt 
     and 
     evt 
     [ 
     field 
     ]: 
     return 
     evt 
     [ 
     field 
     ] 
     return 
     None 
     def 
      
     parse_timestamp 
     ( 
     ts 
     ): 
      
     """ 
     Parse timestamp handling various formats 
     """ 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timezone 
     if 
     isinstance 
     ( 
     ts 
     , 
     ( 
     int 
     , 
     float 
     )): 
     # Handle milliseconds vs seconds 
     return 
     datetime 
     . 
     fromtimestamp 
     ( 
     ts 
     / 
     1000 
     if 
     ts 
    > 1e12 
     else 
     ts 
     , 
     tz 
     = 
     timezone 
     . 
     utc 
     ) 
     if 
     isinstance 
     ( 
     ts 
     , 
     str 
     ): 
     if 
     ts 
     . 
     endswith 
     ( 
     "Z" 
     ): 
     return 
     datetime 
     . 
     fromisoformat 
     ( 
     ts 
     . 
     replace 
     ( 
     "Z" 
     , 
     "+00:00" 
     )) 
     dt 
     = 
     datetime 
     . 
     fromisoformat 
     ( 
     ts 
     ) 
     return 
     dt 
     if 
     dt 
     . 
     tzinfo 
     else 
     dt 
     . 
     replace 
     ( 
     tzinfo 
     = 
     timezone 
     . 
     utc 
     ) 
     raise 
     ValueError 
     ( 
     f 
     "Unsupported timestamp: 
     { 
     ts 
     !r} 
     " 
     ) 
     def 
      
     fetch_events 
     ( 
     token 
     , 
     start_date_iso 
     ): 
      
     """ 
     Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate 
     This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset 
     """ 
     headers 
     = 
     { 
     "Authorization" 
     : 
     f 
     "Bearer 
     { 
     token 
     } 
     " 
     , 
     "Accept" 
     : 
     "application/json" 
     } 
     all_events 
     , 
     current_start 
     = 
     [], 
     start_date_iso 
     # Enforce maximum RecordSize limit of 1000 
     record_size_limited 
     = 
     min 
     ( 
     RECORD_SIZE 
     , 
     1000 
     ) 
     for 
     _ 
     in 
     range 
     ( 
     10 
     ): 
     # MAX 10 iterations to prevent infinite loops 
     # Use the correct endpoint and parameters 
     params 
     = 
     { 
     "StartDate" 
     : 
     current_start_date 
     , 
     "RecordSize" 
     : 
     RECORD_SIZE 
     } 
     resp 
     = 
     requests 
     . 
     get 
     ( 
     f 
     " 
     { 
     BPT_API_URL 
     } 
     /management-api/v2/Events/FromStartDate" 
     , 
     headers 
     = 
     headers 
     , 
     params 
     = 
     { 
     "StartDate" 
     : 
     current_start_date 
     , 
     "RecordSize" 
     : 
     min 
     ( 
     RECORD_SIZE 
     , 
     1000 
     ) 
     }, 
     timeout 
     = 
     300 
     ) 
     resp 
     . 
     raise_for_status 
     () 
     data 
     = 
     resp 
     . 
     json 
     () 
     events 
     = 
     data 
     . 
     get 
     ( 
     "events" 
     , 
     []) 
     if 
     not 
     events 
     : 
     break 
     all_events 
     . 
     extend 
     ( 
     events 
     ) 
     iterations 
     += 
     1 
     # If we got fewer events than RECORD_SIZE, we're done 
     if 
     len 
     ( 
     events 
     ) 
    < RECORD_SIZE 
     : 
     break 
     # For pagination, update StartDate to the timestamp of the last event 
     last_event 
     = 
     events 
     [ 
     - 
     1 
     ] 
     last_timestamp 
     = 
     extract_event_timestamp 
     ( 
     last_event 
     ) 
     if 
     not 
     last_timestamp 
     : 
     print 
     ( 
     "Warning: Could not find timestamp in last event for pagination" 
     ) 
     break 
     # Convert to ISO format if needed and increment slightly to avoid duplicates 
     try 
     : 
     dt 
     = 
     parse_timestamp 
     ( 
     last_timestamp 
     ) 
     # Add 1 second to avoid retrieving the same event again 
     dt 
     = 
     dt 
     + 
     timedelta 
     ( 
     seconds 
     = 
     1 
     ) 
     current_start 
     = 
     dt 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error parsing timestamp 
     { 
     last_timestamp 
     } 
     : 
     { 
     e 
     } 
     " 
     ) 
     break 
     return 
     all_events 
     def 
      
     upload_to_s3 
     ( 
     obj 
     , 
     key 
     ): 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     . 
     put_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     obj 
     ) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     ) 
     def 
      
     main 
     (): 
     # 1) determine window 
     start_dt 
     = 
     read_last_run 
     () 
     end_dt 
     = 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     START 
     = 
     start_dt 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     ) 
     END 
     = 
     end_dt 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     ) 
     print 
     ( 
     f 
     "Fetching events from 
     { 
     START 
     } 
     to 
     { 
     END 
     } 
     " 
     ) 
     # 2) authenticate and fetch 
     try 
     : 
     token 
     = 
     get_oauth_token 
     () 
     events 
     = 
     fetch_events 
     ( 
     token 
     , 
     START 
     ) 
     # Filter events to only include those before our end time 
     filtered_events 
     = 
     [] 
     for 
     evt 
     in 
     events 
     : 
     evt_time 
     = 
     extract_event_timestamp 
     ( 
     evt 
     ) 
     if 
     evt_time 
     : 
     try 
     : 
     evt_dt 
     = 
     parse_timestamp 
     ( 
     evt_time 
     ) 
     if 
     evt_dt 
    < = 
     end_dt 
     : 
     filtered_events 
     . 
     append 
     ( 
     evt 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error parsing event timestamp 
     { 
     evt_time 
     } 
     : 
     { 
     e 
     } 
     " 
     ) 
     # Include event anyway if timestamp parsing fails 
     filtered_events 
     . 
     append 
     ( 
     evt 
     ) 
     else 
     : 
     # Include events without timestamps 
     filtered_events 
     . 
     append 
     ( 
     evt 
     ) 
     count 
     = 
     len 
     ( 
     filtered_events 
     ) 
     if 
     count 
    > 0 
     : 
     # Upload events to S3 
     timestamp_str 
     = 
     end_dt 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     for 
     idx 
     , 
     evt 
     in 
     enumerate 
     ( 
     filtered_events 
     , 
     start 
     = 
     1 
     ): 
     key 
     = 
     f 
     " 
     { 
     S3_PREFIX 
     }{ 
     end_dt 
     . 
     strftime 
     ( 
     '%Y/%m/ 
     %d 
     ' 
     ) 
     } 
     /evt_ 
     { 
     timestamp_str 
     } 
     _ 
     { 
     idx 
     : 
     06d 
     } 
     .json" 
     upload_to_s3 
     ( 
     evt 
     , 
     key 
     ) 
     print 
     ( 
     f 
     "Uploaded 
     { 
     count 
     } 
     events to S3" 
     ) 
     else 
     : 
     print 
     ( 
     "No events to upload" 
     ) 
     # 3) persist state 
     write_last_run 
     ( 
     end_dt 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error: 
     { 
     e 
     } 
     " 
     ) 
     sys 
     . 
     exit 
     ( 
     1 
     ) 
     if 
     __name__ 
     == 
     "__main__" 
     : 
     main 
     () 
     
    
  4. Make it executable:

     chmod  
    +x  
    collector_bpt.py 
    

Schedule Daily with Cron

  1. Run the following command:

     crontab  
    -e 
    
  2. Add the daily job at midnight UTC:

      0 
      
     0 
      
    *  
    *  
    *  
     cd 
      
    ~/bpt-collector && 
     source 
      
    ~/bpt-venv/bin/activate && 
    ./collector_bpt.py 
    

Option 2: AWS Lambda-based Collection

Collect BeyondTrust EPM prerequisites

  1. Sign in to the BeyondTrust Privilege Management web consoleas an administrator.
  2. Go to System Configuration > REST API > Tokens.
  3. Click Add Token.
  4. Provide the following configuration details:
    • Name: Enter Google SecOps Collector .
    • Scopes: Select Audit:Readand other scopes as required.
  5. Click Saveand copy the token value.
  6. Copy and save in a secure location the following details:
    • API Base URL: Your BeyondTrust EPM API URL (for example, https://yourtenant-services.pm.beyondtrustcloud.com ).
    • Client ID: From your OAuth application configuration.
    • Client Secret: From your OAuth application configuration.

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket .
  2. Save bucket Nameand Regionfor future reference (for example, beyondtrust-epm-logs-bucket ).
  3. Create a user following this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin the Access Keyssection.
  7. Select Third-party serviceas the Use case.
  8. Click Next.
  9. Optional: add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin the Permissions policiessection.
  15. Select Add permissions.
  16. Select Attach policies directly
  17. Search for and select the AmazonS3FullAccesspolicy.
  18. Click Next.
  19. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSON tab.
  2. Copy and paste the following policy:

      { 
     "Version" 
     : 
      
     "2012-10-17" 
     , 
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" 
      
     } 
     ] 
     } 
     
    
    • Replace beyondtrust-epm-logs-bucket if you entered a different bucket name.
  3. Click Next > Create policy.

  4. Go to IAM > Roles > Create role > AWS service > Lambda.

  5. Attach the newly created policy and the managed policy AWSLambdaBasicExecutionRole(for CloudWatch logging).

  6. Name the role BeyondTrustEPMLogExportRole and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:
Setting Value
Name BeyondTrustEPMLogExport
Runtime Python 3.13
Architecture x86_64
Execution role BeyondTrustEPMLogExportRole
  1. After the function is created, open the Codetab, delete the stub and enter the following code ( BeyondTrustEPMLogExport.py ):

      import 
      
     json 
     import 
      
     boto3 
     import 
      
     urllib3 
     import 
      
     base64 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timedelta 
     , 
     timezone 
     import 
      
     os 
     from 
      
     typing 
      
     import 
     Dict 
     , 
     List 
     , 
     Optional 
     # Initialize urllib3 pool manager 
     http 
     = 
     urllib3 
     . 
     PoolManager 
     () 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
      
     """ 
     Lambda function to fetch BeyondTrust EPM audit events and store them in S3 
     """ 
     # Environment variables 
     S3_BUCKET 
     = 
     os 
     . 
     environ 
     [ 
     'S3_BUCKET' 
     ] 
     S3_PREFIX 
     = 
     os 
     . 
     environ 
     [ 
     'S3_PREFIX' 
     ] 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     [ 
     'STATE_KEY' 
     ] 
     # BeyondTrust EPM API credentials 
     BPT_API_URL 
     = 
     os 
     . 
     environ 
     [ 
     'BPT_API_URL' 
     ] 
     CLIENT_ID 
     = 
     os 
     . 
     environ 
     [ 
     'CLIENT_ID' 
     ] 
     CLIENT_SECRET 
     = 
     os 
     . 
     environ 
     [ 
     'CLIENT_SECRET' 
     ] 
     OAUTH_SCOPE 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'OAUTH_SCOPE' 
     , 
     'urn:management:api' 
     ) 
     # Optional parameters 
     RECORD_SIZE 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'RECORD_SIZE' 
     , 
     '1000' 
     )) 
     MAX_ITERATIONS 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'MAX_ITERATIONS' 
     , 
     '10' 
     )) 
     s3_client 
     = 
     boto3 
     . 
     client 
     ( 
     's3' 
     ) 
     try 
     : 
     # Get last execution state 
     last_timestamp 
     = 
     get_last_state 
     ( 
     s3_client 
     , 
     S3_BUCKET 
     , 
     STATE_KEY 
     ) 
     # Get OAuth access token 
     access_token 
     = 
     get_oauth_token 
     ( 
     BPT_API_URL 
     , 
     CLIENT_ID 
     , 
     CLIENT_SECRET 
     , 
     OAUTH_SCOPE 
     ) 
     # Fetch audit events 
     events 
     = 
     fetch_audit_events 
     ( 
     BPT_API_URL 
     , 
     access_token 
     , 
     last_timestamp 
     , 
     RECORD_SIZE 
     , 
     MAX_ITERATIONS 
     ) 
     if 
     events 
     : 
     # Store events in S3 
     current_timestamp 
     = 
     datetime 
     . 
     utcnow 
     () 
     filename 
     = 
     f 
     " 
     { 
     S3_PREFIX 
     } 
     beyondtrust-epm-events- 
     { 
     current_timestamp 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     } 
     .json" 
     store_events_to_s3 
     ( 
     s3_client 
     , 
     S3_BUCKET 
     , 
     filename 
     , 
     events 
     ) 
     # Update state with latest timestamp 
     latest_timestamp 
     = 
     get_latest_event_timestamp 
     ( 
     events 
     ) 
     update_state 
     ( 
     s3_client 
     , 
     S3_BUCKET 
     , 
     STATE_KEY 
     , 
     latest_timestamp 
     ) 
     print 
     ( 
     f 
     "Successfully processed 
     { 
     len 
     ( 
     events 
     ) 
     } 
     events and stored to 
     { 
     filename 
     } 
     " 
     ) 
     else 
     : 
     print 
     ( 
     "No new events found" 
     ) 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ( 
     f 
     'Successfully processed 
     { 
     len 
     ( 
     events 
     ) 
      
     if 
      
     events 
      
     else 
      
     0 
     } 
     events' 
     ) 
     } 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error processing BeyondTrust EPM logs: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     " 
     ) 
     return 
     { 
     'statusCode' 
     : 
     500 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ( 
     f 
     'Error: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     ' 
     ) 
     } 
     def 
      
     get_oauth_token 
     ( 
     api_url 
     : 
     str 
     , 
     client_id 
     : 
     str 
     , 
     client_secret 
     : 
     str 
     , 
     scope 
     : 
     str 
     = 
     "urn:management:api" 
     ) 
     - 
    > str 
     : 
      
     """ 
     Get OAuth access token using client credentials flow for BeyondTrust EPM 
     Uses the correct scope: urn:management:api and /oauth/connect/token endpoint 
     """ 
     token_url 
     = 
     f 
     " 
     { 
     api_url 
     } 
     /oauth/connect/token" 
     headers 
     = 
     { 
     'Content-Type' 
     : 
     'application/x-www-form-urlencoded' 
     } 
     body 
     = 
     f 
     "grant_type=client_credentials&client_id= 
     { 
     client_id 
     } 
    & client_secret= 
     { 
     client_secret 
     } 
    & scope= 
     { 
     scope 
     } 
     " 
     response 
     = 
     http 
     . 
     request 
     ( 
     'POST' 
     , 
     token_url 
     , 
     headers 
     = 
     headers 
     , 
     body 
     = 
     body 
     , 
     timeout 
     = 
     urllib3 
     . 
     Timeout 
     ( 
     60.0 
     )) 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     raise 
     RuntimeError 
     ( 
     f 
     "Token request failed: 
     { 
     response 
     . 
     status 
     } 
      
     { 
     response 
     . 
     data 
     [: 
     256 
     ] 
     !r} 
     " 
     ) 
     token_data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     return 
     token_data 
     [ 
     'access_token' 
     ] 
     def 
      
     fetch_audit_events 
     ( 
     api_url 
     : 
     str 
     , 
     access_token 
     : 
     str 
     , 
     last_timestamp 
     : 
     Optional 
     [ 
     str 
     ], 
     record_size 
     : 
     int 
     , 
     max_iterations 
     : 
     int 
     ) 
     - 
    > List 
     [ 
     Dict 
     ]: 
      
     """ 
     Fetch audit events using the correct BeyondTrust EPM API endpoint: 
     /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters 
     """ 
     headers 
     = 
     { 
     'Authorization' 
     : 
     f 
     'Bearer 
     { 
     access_token 
     } 
     ' 
     , 
     'Content-Type' 
     : 
     'application/json' 
     } 
     all_events 
     = 
     [] 
     current_start_date 
     = 
     last_timestamp 
     or 
     ( 
     datetime 
     . 
     utcnow 
     () 
     - 
     timedelta 
     ( 
     hours 
     = 
     24 
     )) 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     ) 
     iterations 
     = 
     0 
     # Enforce maximum RecordSize limit of 1000 
     record_size_limited 
     = 
     min 
     ( 
     record_size 
     , 
     1000 
     ) 
     while 
     iterations 
    < max_iterations 
     : 
     # Use the correct EPM API endpoint and parameters 
     query_url 
     = 
     f 
     " 
     { 
     api_url 
     } 
     /management-api/v2/Events/FromStartDate" 
     params 
     = 
     { 
     'StartDate' 
     : 
     current_start_date 
     , 
     'RecordSize' 
     : 
     record_size_limited 
     } 
     response 
     = 
     http 
     . 
     request 
     ( 
     'GET' 
     , 
     query_url 
     , 
     headers 
     = 
     headers 
     , 
     fields 
     = 
     params 
     , 
     timeout 
     = 
     urllib3 
     . 
     Timeout 
     ( 
     300.0 
     )) 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     raise 
     RuntimeError 
     ( 
     f 
     "API request failed: 
     { 
     response 
     . 
     status 
     } 
      
     { 
     response 
     . 
     data 
     [: 
     256 
     ] 
     !r} 
     " 
     ) 
     response_data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     events 
     = 
     response_data 
     . 
     get 
     ( 
     'events' 
     , 
     []) 
     if 
     not 
     events 
     : 
     break 
     all_events 
     . 
     extend 
     ( 
     events 
     ) 
     iterations 
     += 
     1 
     # If we got fewer events than RecordSize, we've reached the end 
     if 
     len 
     ( 
     events 
     ) 
    < record_size_limited 
     : 
     break 
     # For pagination, update StartDate to the timestamp of the last event 
     last_event 
     = 
     events 
     [ 
     - 
     1 
     ] 
     last_timestamp 
     = 
     extract_event_timestamp 
     ( 
     last_event 
     ) 
     if 
     not 
     last_timestamp 
     : 
     print 
     ( 
     "Warning: Could not find timestamp in last event for pagination" 
     ) 
     break 
     # Convert to datetime and add 1 second to avoid retrieving the same event again 
     try 
     : 
     dt 
     = 
     parse_timestamp 
     ( 
     last_timestamp 
     ) 
     dt 
     = 
     dt 
     + 
     timedelta 
     ( 
     seconds 
     = 
     1 
     ) 
     current_start_date 
     = 
     dt 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error parsing timestamp 
     { 
     last_timestamp 
     } 
     : 
     { 
     e 
     } 
     " 
     ) 
     break 
     return 
     all_events 
     def 
      
     extract_event_timestamp 
     ( 
     event 
     : 
     Dict 
     ) 
     - 
    > Optional 
     [ 
     str 
     ]: 
      
     """ 
     Extract timestamp from event, prioritizing event.ingested field 
     """ 
     # Primary (documented) path: event.ingested 
     if 
     isinstance 
     ( 
     event 
     , 
     dict 
     ) 
     and 
     isinstance 
     ( 
     event 
     . 
     get 
     ( 
     "event" 
     ), 
     dict 
     ): 
     ts 
     = 
     event 
     [ 
     "event" 
     ] 
     . 
     get 
     ( 
     "ingested" 
     ) 
     if 
     ts 
     : 
     return 
     ts 
     # Fallbacks for other timestamp fields 
     timestamp_fields 
     = 
     [ 
     'timestamp' 
     , 
     'eventTime' 
     , 
     'dateTime' 
     , 
     'whenOccurred' 
     , 
     'date' 
     , 
     'time' 
     ] 
     for 
     field 
     in 
     timestamp_fields 
     : 
     if 
     field 
     in 
     event 
     and 
     event 
     [ 
     field 
     ]: 
     return 
     event 
     [ 
     field 
     ] 
     return 
     None 
     def 
      
     parse_timestamp 
     ( 
     timestamp_str 
     : 
     str 
     ) 
     - 
    > datetime 
     : 
      
     """ 
     Parse timestamp string to datetime object, handling various formats 
     """ 
     if 
     isinstance 
     ( 
     timestamp_str 
     , 
     ( 
     int 
     , 
     float 
     )): 
     # Unix timestamp (in milliseconds or seconds) 
     if 
     timestamp_str 
    > 1e12 
     : 
     # Milliseconds 
     return 
     datetime 
     . 
     fromtimestamp 
     ( 
     timestamp_str 
     / 
     1000 
     , 
     tz 
     = 
     timezone 
     . 
     utc 
     ) 
     else 
     : 
     # Seconds 
     return 
     datetime 
     . 
     fromtimestamp 
     ( 
     timestamp_str 
     , 
     tz 
     = 
     timezone 
     . 
     utc 
     ) 
     if 
     isinstance 
     ( 
     timestamp_str 
     , 
     str 
     ): 
     # Try different string formats 
     try 
     : 
     # ISO format with Z 
     if 
     timestamp_str 
     . 
     endswith 
     ( 
     'Z' 
     ): 
     return 
     datetime 
     . 
     fromisoformat 
     ( 
     timestamp_str 
     . 
     replace 
     ( 
     'Z' 
     , 
     '+00:00' 
     )) 
     # ISO format with timezone 
     elif 
     '+' 
     in 
     timestamp_str 
     or 
     timestamp_str 
     . 
     endswith 
     ( 
     '00:00' 
     ): 
     return 
     datetime 
     . 
     fromisoformat 
     ( 
     timestamp_str 
     ) 
     # ISO format without timezone (assume UTC) 
     else 
     : 
     dt 
     = 
     datetime 
     . 
     fromisoformat 
     ( 
     timestamp_str 
     ) 
     if 
     dt 
     . 
     tzinfo 
     is 
     None 
     : 
     dt 
     = 
     dt 
     . 
     replace 
     ( 
     tzinfo 
     = 
     timezone 
     . 
     utc 
     ) 
     return 
     dt 
     except 
     ValueError 
     : 
     pass 
     raise 
     ValueError 
     ( 
     f 
     "Could not parse timestamp: 
     { 
     timestamp_str 
     } 
     " 
     ) 
     def 
      
     get_last_state 
     ( 
     s3_client 
     , 
     bucket 
     : 
     str 
     , 
     state_key 
     : 
     str 
     ) 
     - 
    > Optional 
     [ 
     str 
     ]: 
      
     """ 
     Get the last processed timestamp from S3 state file 
     """ 
     try 
     : 
     response 
     = 
     s3_client 
     . 
     get_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     state_key 
     ) 
     state_data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     [ 
     'Body' 
     ] 
     . 
     read 
     () 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     return 
     state_data 
     . 
     get 
     ( 
     'last_timestamp' 
     ) 
     except 
     s3_client 
     . 
     exceptions 
     . 
     NoSuchKey 
     : 
     print 
     ( 
     "No previous state found, starting from 24 hours ago" 
     ) 
     return 
     None 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error reading state: 
     { 
     e 
     } 
     " 
     ) 
     return 
     None 
     def 
      
     update_state 
     ( 
     s3_client 
     , 
     bucket 
     : 
     str 
     , 
     state_key 
     : 
     str 
     , 
     timestamp 
     : 
     str 
     ): 
      
     """ 
     Update the state file with the latest processed timestamp 
     """ 
     state_data 
     = 
     { 
     'last_timestamp' 
     : 
     timestamp 
     , 
     'updated_at' 
     : 
     datetime 
     . 
     utcnow 
     () 
     . 
     isoformat 
     () 
     + 
     'Z' 
     } 
     s3_client 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     state_key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state_data 
     ), 
     ContentType 
     = 
     'application/json' 
     ) 
     def 
      
     store_events_to_s3 
     ( 
     s3_client 
     , 
     bucket 
     : 
     str 
     , 
     key 
     : 
     str 
     , 
     events 
     : 
     List 
     [ 
     Dict 
     ]): 
      
     """ 
     Store events as JSONL (one JSON object per line) in S3 
     """ 
     # Convert to JSONL format (one JSON object per line) 
     jsonl_content 
     = 
     'n' 
     . 
     join 
     ( 
     json 
     . 
     dumps 
     ( 
     event 
     , 
     default 
     = 
     str 
     ) 
     for 
     event 
     in 
     events 
     ) 
     s3_client 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     jsonl_content 
     , 
     ContentType 
     = 
     'application/x-ndjson' 
     ) 
     def 
      
     get_latest_event_timestamp 
     ( 
     events 
     : 
     List 
     [ 
     Dict 
     ]) 
     - 
    > str 
     : 
      
     """ 
     Get the latest timestamp from the events for state tracking 
     """ 
     if 
     not 
     events 
     : 
     return 
     datetime 
     . 
     utcnow 
     () 
     . 
     isoformat 
     () 
     + 
     'Z' 
     latest 
     = 
     None 
     for 
     event 
     in 
     events 
     : 
     timestamp 
     = 
     extract_event_timestamp 
     ( 
     event 
     ) 
     if 
     timestamp 
     : 
     try 
     : 
     event_dt 
     = 
     parse_timestamp 
     ( 
     timestamp 
     ) 
     event_iso 
     = 
     event_dt 
     . 
     isoformat 
     () 
     + 
     'Z' 
     if 
     latest 
     is 
     None 
     or 
     event_iso 
    > latest 
     : 
     latest 
     = 
     event_iso 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error parsing event timestamp 
     { 
     timestamp 
     } 
     : 
     { 
     e 
     } 
     " 
     ) 
     continue 
     return 
     latest 
     or 
     datetime 
     . 
     utcnow 
     () 
     . 
     isoformat 
     () 
     + 
     'Z' 
     
    
  2. Go to Configuration > Environment variables > Edit > Add new environment variable.

  3. Enter the following environment variables, replacing with your values.

    Key Example value
    S3_BUCKET beyondtrust-epm-logs-bucket
    S3_PREFIX beyondtrust-epm-logs/
    STATE_KEY beyondtrust-epm-logs/state.json
    BPT_API_URL https://yourtenant-services.pm.beyondtrustcloud.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_SCOPE urn:management:api
    RECORD_SIZE 1000
    MAX_ITERATIONS 10
  4. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  5. Select the Configurationtab.

  6. In the General configurationpanel click Edit.

  7. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: your Lambda function BeyondTrustEPMLogExport .
    • Name: BeyondTrustEPMLogExport-1h .
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users > Add users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader .
    • Access type: Select Access key – Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

      { 
     "Version" 
     : 
      
     "2012-10-17" 
     , 
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::beyondtrust-epm-logs-bucket" 
      
     } 
     ] 
     } 
     
    
  7. Set the name to secops-reader-policy .

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV(these values are entered into the feed).

Set up feeds (both options)

To configure a feed, follow these steps:

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, BeyondTrust EPM logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select BeyondTrust Endpoint Privilege Managementas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: The bucket URI
      • s3://your-log-bucket-name/ . Replace your-log-bucket-name with the actual name of the bucket.
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

UDM Mapping Table

Log Field UDM Mapping Logic
agent.id
principal.asset.attribute.labels.value Mapped to label with key agent_id
agent.version
principal.asset.attribute.labels.value Mapped to label with key agent_version
ecs.version
principal.asset.attribute.labels.value Mapped to label with key ecs_version
event_data.reason
metadata.description Event description from raw log
event_datas.ActionId
metadata.product_log_id Product-specific log identifier
file.path
principal.file.full_path Full file path from the event
headers.content_length
additional.fields.value.string_value Mapped to label with key content_length
headers.content_type
additional.fields.value.string_value Mapped to label with key content_type
headers.http_host
additional.fields.value.string_value Mapped to label with key http_host
headers.http_version
network.application_protocol_version HTTP protocol version
headers.request_method
network.http.method HTTP request method
host.hostname
principal.hostname Principal hostname
host.hostname
principal.asset.hostname Principal asset hostname
host.ip
principal.asset.ip Principal asset IP address
host.ip
principal.ip Principal IP address
host.mac
principal.mac Principal MAC address
host.os.platform
principal.platform Set to MAC if equals macOS
host.os.version
principal.platform_version Operating system version
labels.related_item_id
metadata.product_log_id Related item identifier
process.command_line
principal.process.command_line Process command line
process.name
additional.fields.value.string_value Mapped to label with key process_name
process.parent.name
additional.fields.value.string_value Mapped to label with key process_parent_name
process.parent.pid
principal.process.parent_process.pid Parent process PID converted to string
process.pid
principal.process.pid Process PID converted to string
user.id
principal.user.userid User identifier
user.name
principal.user.user_display_name User display name
N/A
metadata.event_timestamp Event timestamp set to log entry timestamp
N/A
metadata.event_type GENERIC_EVENT if no principal, otherwise STATUS_UPDATE
N/A
network.application_protocol Set to HTTP if http_version field contains HTTP

Need more help? Get answers from Community members and Google SecOps professionals.

Design a Mobile Site
View Site in Mobile | Classic
Share by: