Collect BeyondTrust Endpoint Privilege Management (EPM) logs
This document explains how to ingest BeyondTrust Endpoint Privilege Management (EPM) logs to Google Security Operations using two different approaches: EC2-based collection and AWS Lambda-based collection using Amazon S3. The parser focuses on transforming raw JSON log data from BeyondTrust Endpoint into a structured format conforming to the Chronicle UDM. It first initializes default values for various fields and then parses the JSON payload, subsequently mapping specific fields from the raw log into corresponding UDM fields within the event.idm.read_only_udm
object.
Before you begin
Make sure you have the following prerequisites:
- Google SecOps instance
- Privileged access to BeyondTrust Endpoint Privilege Managementtenant or API
- Privileged access to AWS(S3, IAM, Lambda/EC2, EventBridge)
Choose your integration method
You can choose between two integration methods:
- Option 1: EC2-based collection: Uses an EC2 instance with scheduled scripts for log collection
- Option 2: AWS Lambda-based collection: Uses serverless Lambda functions with EventBridge scheduling
Option 1: EC2-based Collection
Configure AWS IAM for Google SecOps ingestion
- Create a Userfollowing this user guide: Creating an IAM user .
- Select the created User.
- Select the Security Credentialstab.
- Click Create Access Keyin the Access Keyssection.
- Select Third-party serviceas Use case.
- Click Next.
- Optional: Add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor future reference.
- Click Done.
- Select the Permissionstab.
- Click Add permissionsin the Permissions policiessection.
- Select Add permissions.
- Select Attach policies directly.
- Search for and select the AmazonS3FullAccesspolicy.
- Click Next.
- Click Add permissions.
Configure BeyondTrust EPM for API access
- Sign in to the BeyondTrust Privilege Management web consoleas an administrator.
- Go to Configuration > Settings > API Settings.
- Click Create an API Account.
- Provide the following configuration details:
- Name: Enter
Google SecOps Collector
. - API Access: Enable Audit (Read)and other scopes as required.
- Name: Enter
- Copy and save the Client IDand Client Secret.
- Copy your API base URL; it's typically
https://<your-tenant>-services.pm.beyondtrustcloud.com
(you'll use this as BPT_API_URL).
Create an AWS S3 Bucket
- Sign in to the AWS Management Console.
- Go to AWS Console > Services > S3 > Create bucket.
- Provide the following configuration details:
- Bucket name:
my-beyondtrust-logs
. - Region: [your choice] > Create.
- Bucket name:
Create an IAM Role for EC2
- Sign in to the AWS Management Console.
- Go to AWS Console > Services > IAM > Roles > Create role.
- Provide the following configuration details:
- Trusted entity: AWS service > EC2 > Next.
- Attach permission: AmazonS3FullAccess(or a scoped policy to your bucket) > Next.
- Role name:
EC2-S3-BPT-Writer
> Create role.
Launch and configure your EC2 Collector VM
- Sign in to the AWS Management Console.
- Go to Services.
- In the search bar, type EC2and select it.
- In the EC2 dashboard, click Instances.
- Click Launch instances.
- Provide the following configuration details:
- Name: Enter
BPT-Log-Collector
. - AMI: Select Ubuntu Server 22.04 LTS.
- Instance type: t3.micro(or larger), then click Next.
- Network: Make sure the Networksetting is set to your default VPC.
- IAM role: Select the EC2-S3-BPT-WriterIAM role from the menu.
- Auto-assign Public IP: Enable (or make sure you can reach it using VPN) > Next.
- Add Storage: Leave the default storage configuration (8 GiB), and then click Next.
- Select Create a new security group.
- Inbound rule: Click Add Rule.
- Type: Select SSH.
- Port: 22.
- Source: your IP
- Click Review and Launch.
- Select or create a key pair.
- Click Download Key Pair.
- Save the downloaded PEM file. You will need this file to connect to your instance using SSH.
- Name: Enter
- Connect to your Virtual Machine (VM) using SSH.
Install collector prerequisites
-
Run the following command:
chmod 400 ~/Downloads/your-key.pem ssh -i ~/Downloads/your-key.pem ubuntu@<EC2_PUBLIC_IP>
-
Update system and install dependencies:
# Update OS sudo apt update && sudo apt upgrade -y # Install Python, Git sudo apt install -y python3 python3-venv python3-pip git # Create & activate virtualenv python3 -m venv ~/bpt-venv source ~/bpt-venv/bin/activate # Install libraries pip install requests boto3
-
Create a directory & state file:
sudo mkdir -p /var/lib/bpt-collector sudo touch /var/lib/bpt-collector/last_run.txt sudo chown ubuntu:ubuntu /var/lib/bpt-collector/last_run.txt
-
Initialize it (for example, to 1 hour ago):
echo " $( date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ ) " > /var/lib/bpt-collector/last_run.txt
Deploy the BeyondTrust EPM Collector Script
-
Create a project folder:
mkdir ~/bpt-collector && cd ~/bpt-collector
-
Export the required environment variables (for example, in
~/.bashrc
):export BPT_API_URL = "https://<your-tenant>-services.pm.beyondtrustcloud.com" export BPT_CLIENT_ID = "your-client-id" export BPT_CLIENT_SECRET = "your-client-secret" export S3_BUCKET = "my-beyondtrust-logs" export S3_PREFIX = "bpt/" export STATE_FILE = "/var/lib/bpt-collector/last_run.txt" export RECORD_SIZE = "1000"
-
Create
collector_bpt.py
and enter the following code:#!/usr/bin/env python3 import os , sys , json , boto3 , requests from datetime import datetime , timezone , timedelta # ── UTILS ────────────────────────────────────────────────────────────── def must_env ( var ): val = os . getenv ( var ) if not val : print ( f "ERROR: environment variable { var } is required" , file = sys . stderr ) sys . exit ( 1 ) return val def ensure_state_file ( path ): d = os . path . dirname ( path ) if not os . path . isdir ( d ): os . makedirs ( d , exist_ok = True ) if not os . path . isfile ( path ): ts = ( datetime . now ( timezone . utc ) - timedelta ( hours = 1 )) . strftime ( "%Y-%m- %d T%H:%M:%SZ" ) with open ( path , "w" ) as f : f . write ( ts ) # ── CONFIG ───────────────────────────────────────────────────────────── BPT_API_URL = must_env ( "BPT_API_URL" ) # e.g., https://tenant-services.pm.beyondtrustcloud.com CLIENT_ID = must_env ( "BPT_CLIENT_ID" ) CLIENT_SECRET = must_env ( "BPT_CLIENT_SECRET" ) S3_BUCKET = must_env ( "S3_BUCKET" ) S3_PREFIX = os . getenv ( "S3_PREFIX" , "" ) # e.g., "bpt/" STATE_FILE = os . getenv ( "STATE_FILE" , "/var/lib/bpt-collector/last_run.txt" ) RECORD_SIZE = int ( os . getenv ( "RECORD_SIZE" , "1000" )) # ── END CONFIG ───────────────────────────────────────────────────────── ensure_state_file ( STATE_FILE ) def read_last_run (): with open ( STATE_FILE , "r" ) as f : ts = f . read () . strip () return datetime . fromisoformat ( ts . replace ( "Z" , "+00:00" )) def write_last_run ( dt ): with open ( STATE_FILE , "w" ) as f : f . write ( dt . strftime ( "%Y-%m- %d T%H:%M:%SZ" )) def get_oauth_token (): """ Get OAuth2 token using client credentials flow Scope: urn:management:api (for EPM Management API access) """ resp = requests . post ( f " { BPT_API_URL } /oauth/connect/token" , headers = { "Content-Type" : "application/x-www-form-urlencoded" }, data = { "grant_type" : "client_credentials" , "client_id" : CLIENT_ID , "client_secret" : CLIENT_SECRET , "scope" : "urn:management:api" } ) resp . raise_for_status () return resp . json ()[ "access_token" ] def extract_event_timestamp ( evt ): """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance ( evt , dict ) and isinstance ( evt . get ( "event" ), dict ): ts = evt [ "event" ] . get ( "ingested" ) if ts : return ts # Fallbacks for other timestamp fields timestamp_fields = [ "timestamp" , "eventTime" , "dateTime" , "whenOccurred" , "date" , "time" ] for field in timestamp_fields : if field in evt and evt [ field ]: return evt [ field ] return None def parse_timestamp ( ts ): """ Parse timestamp handling various formats """ from datetime import datetime , timezone if isinstance ( ts , ( int , float )): # Handle milliseconds vs seconds return datetime . fromtimestamp ( ts / 1000 if ts > 1e12 else ts , tz = timezone . utc ) if isinstance ( ts , str ): if ts . endswith ( "Z" ): return datetime . fromisoformat ( ts . replace ( "Z" , "+00:00" )) dt = datetime . fromisoformat ( ts ) return dt if dt . tzinfo else dt . replace ( tzinfo = timezone . utc ) raise ValueError ( f "Unsupported timestamp: { ts !r} " ) def fetch_events ( token , start_date_iso ): """ Fetch events using the correct EPM API endpoint: /management-api/v2/Events/FromStartDate This endpoint uses StartDate and RecordSize parameters, not startTime/endTime/limit/offset """ headers = { "Authorization" : f "Bearer { token } " , "Accept" : "application/json" } all_events , current_start = [], start_date_iso # Enforce maximum RecordSize limit of 1000 record_size_limited = min ( RECORD_SIZE , 1000 ) for _ in range ( 10 ): # MAX 10 iterations to prevent infinite loops # Use the correct endpoint and parameters params = { "StartDate" : current_start_date , "RecordSize" : RECORD_SIZE } resp = requests . get ( f " { BPT_API_URL } /management-api/v2/Events/FromStartDate" , headers = headers , params = { "StartDate" : current_start_date , "RecordSize" : min ( RECORD_SIZE , 1000 ) }, timeout = 300 ) resp . raise_for_status () data = resp . json () events = data . get ( "events" , []) if not events : break all_events . extend ( events ) iterations += 1 # If we got fewer events than RECORD_SIZE, we're done if len ( events ) < RECORD_SIZE : break # For pagination, update StartDate to the timestamp of the last event last_event = events [ - 1 ] last_timestamp = extract_event_timestamp ( last_event ) if not last_timestamp : print ( "Warning: Could not find timestamp in last event for pagination" ) break # Convert to ISO format if needed and increment slightly to avoid duplicates try : dt = parse_timestamp ( last_timestamp ) # Add 1 second to avoid retrieving the same event again dt = dt + timedelta ( seconds = 1 ) current_start = dt . strftime ( "%Y-%m- %d T%H:%M:%SZ" ) except Exception as e : print ( f "Error parsing timestamp { last_timestamp } : { e } " ) break return all_events def upload_to_s3 ( obj , key ): boto3 . client ( "s3" ) . put_object ( Bucket = S3_BUCKET , Key = key , Body = json . dumps ( obj ) . encode ( "utf-8" ), ContentType = "application/json" ) def main (): # 1) determine window start_dt = read_last_run () end_dt = datetime . now ( timezone . utc ) START = start_dt . strftime ( "%Y-%m- %d T%H:%M:%SZ" ) END = end_dt . strftime ( "%Y-%m- %d T%H:%M:%SZ" ) print ( f "Fetching events from { START } to { END } " ) # 2) authenticate and fetch try : token = get_oauth_token () events = fetch_events ( token , START ) # Filter events to only include those before our end time filtered_events = [] for evt in events : evt_time = extract_event_timestamp ( evt ) if evt_time : try : evt_dt = parse_timestamp ( evt_time ) if evt_dt < = end_dt : filtered_events . append ( evt ) except Exception as e : print ( f "Error parsing event timestamp { evt_time } : { e } " ) # Include event anyway if timestamp parsing fails filtered_events . append ( evt ) else : # Include events without timestamps filtered_events . append ( evt ) count = len ( filtered_events ) if count > 0 : # Upload events to S3 timestamp_str = end_dt . strftime ( '%Y%m %d _%H%M%S' ) for idx , evt in enumerate ( filtered_events , start = 1 ): key = f " { S3_PREFIX }{ end_dt . strftime ( '%Y/%m/ %d ' ) } /evt_ { timestamp_str } _ { idx : 06d } .json" upload_to_s3 ( evt , key ) print ( f "Uploaded { count } events to S3" ) else : print ( "No events to upload" ) # 3) persist state write_last_run ( end_dt ) except Exception as e : print ( f "Error: { e } " ) sys . exit ( 1 ) if __name__ == "__main__" : main ()
-
Make it executable:
chmod +x collector_bpt.py
Schedule Daily with Cron
-
Run the following command:
crontab -e
-
Add the daily job at midnight UTC:
0 0 * * * cd ~/bpt-collector && source ~/bpt-venv/bin/activate && ./collector_bpt.py
Option 2: AWS Lambda-based Collection
Collect BeyondTrust EPM prerequisites
- Sign in to the BeyondTrust Privilege Management web consoleas an administrator.
- Go to System Configuration > REST API > Tokens.
- Click Add Token.
- Provide the following configuration details:
- Name: Enter
Google SecOps Collector
. - Scopes: Select Audit:Readand other scopes as required.
- Name: Enter
- Click Saveand copy the token value.
- Copy and save in a secure location the following details:
- API Base URL: Your BeyondTrust EPM API URL (for example,
https://yourtenant-services.pm.beyondtrustcloud.com
). - Client ID: From your OAuth application configuration.
- Client Secret: From your OAuth application configuration.
- API Base URL: Your BeyondTrust EPM API URL (for example,
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucketfollowing this user guide: Creating a bucket .
- Save bucket Nameand Regionfor future reference (for example,
beyondtrust-epm-logs-bucket
). - Create a user following this user guide: Creating an IAM user .
- Select the created User.
- Select the Security credentialstab.
- Click Create Access Keyin the Access Keyssection.
- Select Third-party serviceas the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
- Click Done.
- Select the Permissionstab.
- Click Add permissionsin the Permissions policiessection.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccesspolicy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- In the AWS console, go to IAM > Policies > Create policy > JSON tab.
-
Copy and paste the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Sid" : "AllowPutObjects" , "Effect" : "Allow" , "Action" : "s3:PutObject" , "Resource" : "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Sid" : "AllowGetStateObject" , "Effect" : "Allow" , "Action" : "s3:GetObject" , "Resource" : "arn:aws:s3:::beyondtrust-epm-logs-bucket/beyondtrust-epm-logs/state.json" } ] }
- Replace
beyondtrust-epm-logs-bucket
if you entered a different bucket name.
- Replace
-
Click Next > Create policy.
-
Go to IAM > Roles > Create role > AWS service > Lambda.
-
Attach the newly created policy and the managed policy AWSLambdaBasicExecutionRole(for CloudWatch logging).
-
Name the role
BeyondTrustEPMLogExportRole
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
- Provide the following configuration details:
Setting | Value |
---|---|
Name | BeyondTrustEPMLogExport
|
Runtime | Python 3.13 |
Architecture | x86_64 |
Execution role | BeyondTrustEPMLogExportRole
|
-
After the function is created, open the Codetab, delete the stub and enter the following code (
BeyondTrustEPMLogExport.py
):import json import boto3 import urllib3 import base64 from datetime import datetime , timedelta , timezone import os from typing import Dict , List , Optional # Initialize urllib3 pool manager http = urllib3 . PoolManager () def lambda_handler ( event , context ): """ Lambda function to fetch BeyondTrust EPM audit events and store them in S3 """ # Environment variables S3_BUCKET = os . environ [ 'S3_BUCKET' ] S3_PREFIX = os . environ [ 'S3_PREFIX' ] STATE_KEY = os . environ [ 'STATE_KEY' ] # BeyondTrust EPM API credentials BPT_API_URL = os . environ [ 'BPT_API_URL' ] CLIENT_ID = os . environ [ 'CLIENT_ID' ] CLIENT_SECRET = os . environ [ 'CLIENT_SECRET' ] OAUTH_SCOPE = os . environ . get ( 'OAUTH_SCOPE' , 'urn:management:api' ) # Optional parameters RECORD_SIZE = int ( os . environ . get ( 'RECORD_SIZE' , '1000' )) MAX_ITERATIONS = int ( os . environ . get ( 'MAX_ITERATIONS' , '10' )) s3_client = boto3 . client ( 's3' ) try : # Get last execution state last_timestamp = get_last_state ( s3_client , S3_BUCKET , STATE_KEY ) # Get OAuth access token access_token = get_oauth_token ( BPT_API_URL , CLIENT_ID , CLIENT_SECRET , OAUTH_SCOPE ) # Fetch audit events events = fetch_audit_events ( BPT_API_URL , access_token , last_timestamp , RECORD_SIZE , MAX_ITERATIONS ) if events : # Store events in S3 current_timestamp = datetime . utcnow () filename = f " { S3_PREFIX } beyondtrust-epm-events- { current_timestamp . strftime ( '%Y%m %d _%H%M%S' ) } .json" store_events_to_s3 ( s3_client , S3_BUCKET , filename , events ) # Update state with latest timestamp latest_timestamp = get_latest_event_timestamp ( events ) update_state ( s3_client , S3_BUCKET , STATE_KEY , latest_timestamp ) print ( f "Successfully processed { len ( events ) } events and stored to { filename } " ) else : print ( "No new events found" ) return { 'statusCode' : 200 , 'body' : json . dumps ( f 'Successfully processed { len ( events ) if events else 0 } events' ) } except Exception as e : print ( f "Error processing BeyondTrust EPM logs: { str ( e ) } " ) return { 'statusCode' : 500 , 'body' : json . dumps ( f 'Error: { str ( e ) } ' ) } def get_oauth_token ( api_url : str , client_id : str , client_secret : str , scope : str = "urn:management:api" ) - > str : """ Get OAuth access token using client credentials flow for BeyondTrust EPM Uses the correct scope: urn:management:api and /oauth/connect/token endpoint """ token_url = f " { api_url } /oauth/connect/token" headers = { 'Content-Type' : 'application/x-www-form-urlencoded' } body = f "grant_type=client_credentials&client_id= { client_id } & client_secret= { client_secret } & scope= { scope } " response = http . request ( 'POST' , token_url , headers = headers , body = body , timeout = urllib3 . Timeout ( 60.0 )) if response . status != 200 : raise RuntimeError ( f "Token request failed: { response . status } { response . data [: 256 ] !r} " ) token_data = json . loads ( response . data . decode ( 'utf-8' )) return token_data [ 'access_token' ] def fetch_audit_events ( api_url : str , access_token : str , last_timestamp : Optional [ str ], record_size : int , max_iterations : int ) - > List [ Dict ]: """ Fetch audit events using the correct BeyondTrust EPM API endpoint: /management-api/v2/Events/FromStartDate with StartDate and RecordSize parameters """ headers = { 'Authorization' : f 'Bearer { access_token } ' , 'Content-Type' : 'application/json' } all_events = [] current_start_date = last_timestamp or ( datetime . utcnow () - timedelta ( hours = 24 )) . strftime ( "%Y-%m- %d T%H:%M:%SZ" ) iterations = 0 # Enforce maximum RecordSize limit of 1000 record_size_limited = min ( record_size , 1000 ) while iterations < max_iterations : # Use the correct EPM API endpoint and parameters query_url = f " { api_url } /management-api/v2/Events/FromStartDate" params = { 'StartDate' : current_start_date , 'RecordSize' : record_size_limited } response = http . request ( 'GET' , query_url , headers = headers , fields = params , timeout = urllib3 . Timeout ( 300.0 )) if response . status != 200 : raise RuntimeError ( f "API request failed: { response . status } { response . data [: 256 ] !r} " ) response_data = json . loads ( response . data . decode ( 'utf-8' )) events = response_data . get ( 'events' , []) if not events : break all_events . extend ( events ) iterations += 1 # If we got fewer events than RecordSize, we've reached the end if len ( events ) < record_size_limited : break # For pagination, update StartDate to the timestamp of the last event last_event = events [ - 1 ] last_timestamp = extract_event_timestamp ( last_event ) if not last_timestamp : print ( "Warning: Could not find timestamp in last event for pagination" ) break # Convert to datetime and add 1 second to avoid retrieving the same event again try : dt = parse_timestamp ( last_timestamp ) dt = dt + timedelta ( seconds = 1 ) current_start_date = dt . strftime ( "%Y-%m- %d T%H:%M:%SZ" ) except Exception as e : print ( f "Error parsing timestamp { last_timestamp } : { e } " ) break return all_events def extract_event_timestamp ( event : Dict ) - > Optional [ str ]: """ Extract timestamp from event, prioritizing event.ingested field """ # Primary (documented) path: event.ingested if isinstance ( event , dict ) and isinstance ( event . get ( "event" ), dict ): ts = event [ "event" ] . get ( "ingested" ) if ts : return ts # Fallbacks for other timestamp fields timestamp_fields = [ 'timestamp' , 'eventTime' , 'dateTime' , 'whenOccurred' , 'date' , 'time' ] for field in timestamp_fields : if field in event and event [ field ]: return event [ field ] return None def parse_timestamp ( timestamp_str : str ) - > datetime : """ Parse timestamp string to datetime object, handling various formats """ if isinstance ( timestamp_str , ( int , float )): # Unix timestamp (in milliseconds or seconds) if timestamp_str > 1e12 : # Milliseconds return datetime . fromtimestamp ( timestamp_str / 1000 , tz = timezone . utc ) else : # Seconds return datetime . fromtimestamp ( timestamp_str , tz = timezone . utc ) if isinstance ( timestamp_str , str ): # Try different string formats try : # ISO format with Z if timestamp_str . endswith ( 'Z' ): return datetime . fromisoformat ( timestamp_str . replace ( 'Z' , '+00:00' )) # ISO format with timezone elif '+' in timestamp_str or timestamp_str . endswith ( '00:00' ): return datetime . fromisoformat ( timestamp_str ) # ISO format without timezone (assume UTC) else : dt = datetime . fromisoformat ( timestamp_str ) if dt . tzinfo is None : dt = dt . replace ( tzinfo = timezone . utc ) return dt except ValueError : pass raise ValueError ( f "Could not parse timestamp: { timestamp_str } " ) def get_last_state ( s3_client , bucket : str , state_key : str ) - > Optional [ str ]: """ Get the last processed timestamp from S3 state file """ try : response = s3_client . get_object ( Bucket = bucket , Key = state_key ) state_data = json . loads ( response [ 'Body' ] . read () . decode ( 'utf-8' )) return state_data . get ( 'last_timestamp' ) except s3_client . exceptions . NoSuchKey : print ( "No previous state found, starting from 24 hours ago" ) return None except Exception as e : print ( f "Error reading state: { e } " ) return None def update_state ( s3_client , bucket : str , state_key : str , timestamp : str ): """ Update the state file with the latest processed timestamp """ state_data = { 'last_timestamp' : timestamp , 'updated_at' : datetime . utcnow () . isoformat () + 'Z' } s3_client . put_object ( Bucket = bucket , Key = state_key , Body = json . dumps ( state_data ), ContentType = 'application/json' ) def store_events_to_s3 ( s3_client , bucket : str , key : str , events : List [ Dict ]): """ Store events as JSONL (one JSON object per line) in S3 """ # Convert to JSONL format (one JSON object per line) jsonl_content = 'n' . join ( json . dumps ( event , default = str ) for event in events ) s3_client . put_object ( Bucket = bucket , Key = key , Body = jsonl_content , ContentType = 'application/x-ndjson' ) def get_latest_event_timestamp ( events : List [ Dict ]) - > str : """ Get the latest timestamp from the events for state tracking """ if not events : return datetime . utcnow () . isoformat () + 'Z' latest = None for event in events : timestamp = extract_event_timestamp ( event ) if timestamp : try : event_dt = parse_timestamp ( timestamp ) event_iso = event_dt . isoformat () + 'Z' if latest is None or event_iso > latest : latest = event_iso except Exception as e : print ( f "Error parsing event timestamp { timestamp } : { e } " ) continue return latest or datetime . utcnow () . isoformat () + 'Z'
-
Go to Configuration > Environment variables > Edit > Add new environment variable.
-
Enter the following environment variables, replacing with your values.
Key Example value S3_BUCKET
beyondtrust-epm-logs-bucket
S3_PREFIX
beyondtrust-epm-logs/
STATE_KEY
beyondtrust-epm-logs/state.json
BPT_API_URL
https://yourtenant-services.pm.beyondtrustcloud.com
CLIENT_ID
your-client-id
CLIENT_SECRET
your-client-secret
OAUTH_SCOPE
urn:management:api
RECORD_SIZE
1000
MAX_ITERATIONS
10
-
After the function is created, stay on its page (or open Lambda > Functions > your-function).
-
Select the Configurationtab.
-
In the General configurationpanel click Edit.
-
Change Timeoutto 5 minutes (300 seconds)and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate(
1 hour
). - Target: your Lambda function
BeyondTrustEPMLogExport
. - Name:
BeyondTrustEPMLogExport-1h
.
- Recurring schedule: Rate(
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- Go to AWS Console > IAM > Users > Add users.
- Click Add users.
- Provide the following configuration details:
- User: Enter
secops-reader
. - Access type: Select Access key – Programmatic access.
- User: Enter
- Click Create user.
- Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
-
In the JSON editor, enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::beyondtrust-epm-logs-bucket/*" }, { "Effect" : "Allow" , "Action" : [ "s3:ListBucket" ], "Resource" : "arn:aws:s3:::beyondtrust-epm-logs-bucket" } ] }
-
Set the name to
secops-reader-policy
. -
Go to Create policy > search/select > Next > Add permissions.
-
Go to Security credentials > Access keys > Create access key.
-
Download the CSV(these values are entered into the feed).
Set up feeds (both options)
To configure a feed, follow these steps:
- Go to SIEM Settings > Feeds.
- Click + Add New Feed.
- In the Feed namefield, enter a name for the feed (for example,
BeyondTrust EPM logs
). - Select Amazon S3 V2as the Source type.
- Select BeyondTrust Endpoint Privilege Managementas the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI: The bucket URI
-
s3://your-log-bucket-name/
. Replaceyour-log-bucket-name
with the actual name of the bucket.
-
- Source deletion options: Select deletion option according to your preference.
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: The asset namespace .
- Ingestion labels: The label applied to the events from this feed.
- S3 URI: The bucket URI
- Click Next.
- Review your new feed configuration in the Finalizescreen, and then click Submit.
UDM Mapping Table
Log Field | UDM Mapping | Logic |
---|---|---|
agent.id
|
principal.asset.attribute.labels.value
|
Mapped to label with key agent_id
|
agent.version
|
principal.asset.attribute.labels.value
|
Mapped to label with key agent_version
|
ecs.version
|
principal.asset.attribute.labels.value
|
Mapped to label with key ecs_version
|
event_data.reason
|
metadata.description
|
Event description from raw log |
event_datas.ActionId
|
metadata.product_log_id
|
Product-specific log identifier |
file.path
|
principal.file.full_path
|
Full file path from the event |
headers.content_length
|
additional.fields.value.string_value
|
Mapped to label with key content_length
|
headers.content_type
|
additional.fields.value.string_value
|
Mapped to label with key content_type
|
headers.http_host
|
additional.fields.value.string_value
|
Mapped to label with key http_host
|
headers.http_version
|
network.application_protocol_version
|
HTTP protocol version |
headers.request_method
|
network.http.method
|
HTTP request method |
host.hostname
|
principal.hostname
|
Principal hostname |
host.hostname
|
principal.asset.hostname
|
Principal asset hostname |
host.ip
|
principal.asset.ip
|
Principal asset IP address |
host.ip
|
principal.ip
|
Principal IP address |
host.mac
|
principal.mac
|
Principal MAC address |
host.os.platform
|
principal.platform
|
Set to MAC if equals macOS |
host.os.version
|
principal.platform_version
|
Operating system version |
labels.related_item_id
|
metadata.product_log_id
|
Related item identifier |
process.command_line
|
principal.process.command_line
|
Process command line |
process.name
|
additional.fields.value.string_value
|
Mapped to label with key process_name
|
process.parent.name
|
additional.fields.value.string_value
|
Mapped to label with key process_parent_name
|
process.parent.pid
|
principal.process.parent_process.pid
|
Parent process PID converted to string |
process.pid
|
principal.process.pid
|
Process PID converted to string |
user.id
|
principal.user.userid
|
User identifier |
user.name
|
principal.user.user_display_name
|
User display name |
N/A
|
metadata.event_timestamp
|
Event timestamp set to log entry timestamp |
N/A
|
metadata.event_type
|
GENERIC_EVENT if no principal, otherwise STATUS_UPDATE |
N/A
|
network.application_protocol
|
Set to HTTP if http_version field contains HTTP |
Need more help? Get answers from Community members and Google SecOps professionals.