Collect Citrix Monitor Service logs

Supported in:

This document explains how to ingest Citrix Monitor Service logs to Google Security Operations using Amazon S3. The parser transforms raw JSON formatted logs into a structured format conforming to the Google SecOps UDM. It extracts relevant fields from the raw log, maps them to corresponding UDM fields, and enriches the data with additional context like user information, machine details, and network activity.

Before you begin

Make sure you have the following prerequisites:

  • Google SecOps instance
  • Privileged access to Citrix Cloudtenant
  • Privileged access to AWS(S3, IAM, Lambda, EventBridge)

Collect Citrix Monitor Service prerequisites (IDs, API keys, org IDs, tokens)

  1. Sign in to the Citrix Cloud Console.
  2. Go to Identity and Access Management > API Access.
  3. Click Create Client.
  4. Copy and save in a secure location the following details:
    • Client ID
    • Client Secret
    • Customer ID(visible in Citrix Cloud console)
    • API Base URL:
      • Global: https://api.cloud.com
      • Japan: https://api.citrixcloud.jp

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, citrix-monitor-logs ).
  3. Create a user following this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin the Access Keyssection.
  7. Select Third-party serviceas the Use case.
  8. Click Next.
  9. Optional: add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin the Permissions policiessection.
  15. Select Add permissions.
  16. Select Attach policies directly
  17. Search for and select the AmazonS3FullAccesspolicy.
  18. Click Next.
  19. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSON tab.
  2. Enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::citrix-monitor-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::citrix-monitor-logs/citrix_monitor/state.json" 
      
     } 
      
     ] 
     } 
     
    
    • Replace citrix-monitor-logs if you entered a different bucket name.
  3. Click Next > Create policy.

  4. Go to IAM > Roles > Create role > AWS service > Lambda.

  5. Attach the newly created policy and the AWSLambdaBasicExecutionRolemanaged policy.

  6. Name the role CitrixMonitorLambdaRole and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name CitrixMonitorCollector
    Runtime Python 3.13
    Architecture x86_64
    Execution role CitrixMonitorLambdaRole
  4. After the function is created, open the Codetab, delete the stub and enter the following code ( CitrixMonitorCollector.py ):

      import 
      
     os 
     import 
      
     json 
     import 
      
     uuid 
     import 
      
     datetime 
     import 
      
     urllib.parse 
     import 
      
     urllib.request 
     import 
      
     urllib.error 
     import 
      
     boto3 
     import 
      
     botocore 
     # Citrix Cloud OAuth2 endpoint template 
     TOKEN_URL_TMPL 
     = 
     " 
     {api_base} 
     /cctrustoauth2/ 
     {customerid} 
     /tokens/clients" 
     DEFAULT_API_BASE 
     = 
     "https://api.cloud.com" 
     MONITOR_BASE_PATH 
     = 
     "/monitorodata" 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     def 
      
     http_post_form 
     ( 
     url 
     , 
     data_dict 
     ): 
      
     """POST form data to get authentication token.""" 
     data 
     = 
     urllib 
     . 
     parse 
     . 
     urlencode 
     ( 
     data_dict 
     ) 
     . 
     encode 
     ( 
     "utf-8" 
     ) 
     req 
     = 
     urllib 
     . 
     request 
     . 
     Request 
     ( 
     url 
     , 
     data 
     = 
     data 
     , 
     headers 
     = 
     { 
     "Accept" 
     : 
     "application/json" 
     , 
     "Content-Type" 
     : 
     "application/x-www-form-urlencoded" 
     , 
     }) 
     with 
     urllib 
     . 
     request 
     . 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     45 
     ) 
     as 
     resp 
     : 
     return 
     json 
     . 
     loads 
     ( 
     resp 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     def 
      
     http_get_json 
     ( 
     url 
     , 
     headers 
     ): 
      
     """GET JSON data from API endpoint.""" 
     req 
     = 
     urllib 
     . 
     request 
     . 
     Request 
     ( 
     url 
     , 
     headers 
     = 
     headers 
     ) 
     with 
     urllib 
     . 
     request 
     . 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     90 
     ) 
     as 
     resp 
     : 
     return 
     json 
     . 
     loads 
     ( 
     resp 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     def 
      
     get_citrix_token 
     ( 
     api_base 
     , 
     customer_id 
     , 
     client_id 
     , 
     client_secret 
     ): 
      
     """Get Citrix Cloud authentication token.""" 
     url 
     = 
     TOKEN_URL_TMPL 
     . 
     format 
     ( 
     api_base 
     = 
     api_base 
     . 
     rstrip 
     ( 
     "/" 
     ), 
     customerid 
     = 
     customer_id 
     ) 
     payload 
     = 
     { 
     "grant_type" 
     : 
     "client_credentials" 
     , 
     "client_id" 
     : 
     client_id 
     , 
     "client_secret" 
     : 
     client_secret 
     , 
     } 
     response 
     = 
     http_post_form 
     ( 
     url 
     , 
     payload 
     ) 
     return 
     response 
     [ 
     "access_token" 
     ] 
     def 
      
     build_entity_url 
     ( 
     api_base 
     , 
     entity 
     , 
     filter_query 
     = 
     None 
     , 
     top 
     = 
     None 
     ): 
      
     """Build OData URL with optional filter and pagination.""" 
     base 
     = 
     api_base 
     . 
     rstrip 
     ( 
     "/" 
     ) 
     + 
     MONITOR_BASE_PATH 
     + 
     "/" 
     + 
     entity 
     params 
     = 
     [] 
     if 
     filter_query 
     : 
     params 
     . 
     append 
     ( 
     "$filter=" 
     + 
     urllib 
     . 
     parse 
     . 
     quote 
     ( 
     filter_query 
     , 
     safe 
     = 
     "()= ':-TZ0123456789" 
     )) 
     if 
     top 
     : 
     params 
     . 
     append 
     ( 
     "$top=" 
     + 
     str 
     ( 
     top 
     )) 
     return 
     base 
     + 
     ( 
     "?" 
     + 
     "&" 
     . 
     join 
     ( 
     params 
     ) 
     if 
     params 
     else 
     "" 
     ) 
     def 
      
     fetch_entity_rows 
     ( 
     entity 
     , 
     start_iso 
     = 
     None 
     , 
     end_iso 
     = 
     None 
     , 
     page_size 
     = 
     1000 
     , 
     headers 
     = 
     None 
     , 
     api_base 
     = 
     DEFAULT_API_BASE 
     ): 
      
     """Fetch entity data with optional time filtering and pagination.""" 
     # Try ModifiedDate filter if timestamps are provided 
     first_url 
     = 
     None 
     if 
     start_iso 
     and 
     end_iso 
     : 
     filter_query 
     = 
     f 
     "(ModifiedDate ge 
     { 
     start_iso 
     } 
     and ModifiedDate lt 
     { 
     end_iso 
     } 
     )" 
     first_url 
     = 
     build_entity_url 
     ( 
     api_base 
     , 
     entity 
     , 
     filter_query 
     , 
     page_size 
     ) 
     else 
     : 
     first_url 
     = 
     build_entity_url 
     ( 
     api_base 
     , 
     entity 
     , 
     None 
     , 
     page_size 
     ) 
     url 
     = 
     first_url 
     while 
     url 
     : 
     try 
     : 
     data 
     = 
     http_get_json 
     ( 
     url 
     , 
     headers 
     ) 
     items 
     = 
     data 
     . 
     get 
     ( 
     "value" 
     , 
     []) 
     for 
     item 
     in 
     items 
     : 
     yield 
     item 
     url 
     = 
     data 
     . 
     get 
     ( 
     "@odata.nextLink" 
     ) 
     except 
     urllib 
     . 
     error 
     . 
     HTTPError 
     as 
     e 
     : 
     # If ModifiedDate filtering fails, fall back to unfiltered query 
     if 
     e 
     . 
     code 
     == 
     400 
     and 
     start_iso 
     and 
     end_iso 
     : 
     print 
     ( 
     f 
     "ModifiedDate filter not supported for 
     { 
     entity 
     } 
     , falling back to unfiltered query" 
     ) 
     url 
     = 
     build_entity_url 
     ( 
     api_base 
     , 
     entity 
     , 
     None 
     , 
     page_size 
     ) 
     continue 
     else 
     : 
     raise 
     def 
      
     read_state_file 
     ( 
     bucket 
     , 
     key 
     ): 
      
     """Read the last processed timestamp from S3 state file.""" 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     ) 
     content 
     = 
     obj 
     [ 
     "Body" 
     ] 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     ) 
     state 
     = 
     json 
     . 
     loads 
     ( 
     content 
     ) 
     timestamp_str 
     = 
     state 
     . 
     get 
     ( 
     "last_hour_utc" 
     ) 
     if 
     timestamp_str 
     : 
     return 
     datetime 
     . 
     datetime 
     . 
     fromisoformat 
     ( 
     timestamp_str 
     . 
     replace 
     ( 
     "Z" 
     , 
     "+00:00" 
     )) 
     . 
     replace 
     ( 
     tzinfo 
     = 
     None 
     ) 
     except 
     botocore 
     . 
     exceptions 
     . 
     ClientError 
     as 
     e 
     : 
     if 
     e 
     . 
     response 
     [ 
     "Error" 
     ][ 
     "Code" 
     ] 
     == 
     "NoSuchKey" 
     : 
     return 
     None 
     raise 
     return 
     None 
     def 
      
     write_state_file 
     ( 
     bucket 
     , 
     key 
     , 
     dt_utc 
     ): 
      
     """Write the current processed timestamp to S3 state file.""" 
     state 
     = 
     { 
     "last_hour_utc" 
     : 
     dt_utc 
     . 
     isoformat 
     () 
     + 
     "Z" 
     } 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )), 
     ContentType 
     = 
     "application/json" 
     ) 
     def 
      
     write_ndjson_to_s3 
     ( 
     bucket 
     , 
     key 
     , 
     rows 
     ): 
      
     """Write rows as NDJSON to S3.""" 
     body_lines 
     = 
     [] 
     for 
     row 
     in 
     rows 
     : 
     json_line 
     = 
     json 
     . 
     dumps 
     ( 
     row 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     ), 
     ensure_ascii 
     = 
     False 
     ) 
     body_lines 
     . 
     append 
     ( 
     json_line 
     ) 
     body 
     = 
     ( 
     "n" 
     . 
     join 
     ( 
     body_lines 
     ) 
     + 
     "n" 
     ) 
     . 
     encode 
     ( 
     "utf-8" 
     ) 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     body 
     , 
     ContentType 
     = 
     "application/x-ndjson" 
     ) 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
      
     """Main Lambda handler function.""" 
     # Environment variables 
     bucket 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     prefix 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "citrix_monitor" 
     ) 
     . 
     strip 
     ( 
     "/" 
     ) 
     state_key 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     ) 
     or 
     f 
     " 
     { 
     prefix 
     } 
     /state.json" 
     customer_id 
     = 
     os 
     . 
     environ 
     [ 
     "CITRIX_CUSTOMER_ID" 
     ] 
     client_id 
     = 
     os 
     . 
     environ 
     [ 
     "CITRIX_CLIENT_ID" 
     ] 
     client_secret 
     = 
     os 
     . 
     environ 
     [ 
     "CITRIX_CLIENT_SECRET" 
     ] 
     api_base 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "API_BASE" 
     , 
     DEFAULT_API_BASE 
     ) 
     entities 
     = 
     [ 
     e 
     . 
     strip 
     () 
     for 
     e 
     in 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "ENTITIES" 
     , 
     "Machines,Sessions,Connections,Applications,Users" 
     ) 
     . 
     split 
     ( 
     "," 
     ) 
     if 
     e 
     . 
     strip 
     ()] 
     page_size 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "PAGE_SIZE" 
     , 
     "1000" 
     )) 
     lookback_minutes 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "LOOKBACK_MINUTES" 
     , 
     "75" 
     )) 
     use_time_filter 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "USE_TIME_FILTER" 
     , 
     "true" 
     ) 
     . 
     lower 
     () 
     == 
     "true" 
     # Time window calculation 
     now 
     = 
     datetime 
     . 
     datetime 
     . 
     utcnow 
     () 
     fallback_hour 
     = 
     ( 
     now 
     - 
     datetime 
     . 
     timedelta 
     ( 
     minutes 
     = 
     lookback_minutes 
     )) 
     . 
     replace 
     ( 
     minute 
     = 
     0 
     , 
     second 
     = 
     0 
     , 
     microsecond 
     = 
     0 
     ) 
     last_processed 
     = 
     read_state_file 
     ( 
     bucket 
     , 
     state_key 
     ) 
     target_hour 
     = 
     ( 
     last_processed 
     + 
     datetime 
     . 
     timedelta 
     ( 
     hours 
     = 
     1 
     )) 
     if 
     last_processed 
     else 
     fallback_hour 
     start_iso 
     = 
     target_hour 
     . 
     isoformat 
     () 
     + 
     "Z" 
     end_iso 
     = 
     ( 
     target_hour 
     + 
     datetime 
     . 
     timedelta 
     ( 
     hours 
     = 
     1 
     )) 
     . 
     isoformat 
     () 
     + 
     "Z" 
     # Authentication 
     token 
     = 
     get_citrix_token 
     ( 
     api_base 
     , 
     customer_id 
     , 
     client_id 
     , 
     client_secret 
     ) 
     headers 
     = 
     { 
     "Authorization" 
     : 
     f 
     "CWSAuth bearer= 
     { 
     token 
     } 
     " 
     , 
     "Citrix-CustomerId" 
     : 
     customer_id 
     , 
     "Accept" 
     : 
     "application/json" 
     , 
     "Accept-Encoding" 
     : 
     "gzip, deflate, br" 
     , 
     "User-Agent" 
     : 
     "citrix-monitor-s3-collector/1.0" 
     } 
     total_records 
     = 
     0 
     # Process each entity type 
     for 
     entity 
     in 
     entities 
     : 
     rows_batch 
     = 
     [] 
     try 
     : 
     entity_generator 
     = 
     fetch_entity_rows 
     ( 
     entity 
     = 
     entity 
     , 
     start_iso 
     = 
     start_iso 
     if 
     use_time_filter 
     else 
     None 
     , 
     end_iso 
     = 
     end_iso 
     if 
     use_time_filter 
     else 
     None 
     , 
     page_size 
     = 
     page_size 
     , 
     headers 
     = 
     headers 
     , 
     api_base 
     = 
     api_base 
     ) 
     for 
     row 
     in 
     entity_generator 
     : 
     # Store raw Citrix data directly for proper parser recognition 
     rows_batch 
     . 
     append 
     ( 
     row 
     ) 
     # Write in batches to avoid memory issues 
     if 
     len 
     ( 
     rows_batch 
     ) 
    > = 
     1000 
     : 
     s3_key 
     = 
     f 
     " 
     { 
     prefix 
     } 
     / 
     { 
     entity 
     } 
     /year= 
     { 
     target_hour 
     . 
     year 
     : 
     04d 
     } 
     /month= 
     { 
     target_hour 
     . 
     month 
     : 
     02d 
     } 
     /day= 
     { 
     target_hour 
     . 
     day 
     : 
     02d 
     } 
     /hour= 
     { 
     target_hour 
     . 
     hour 
     : 
     02d 
     } 
     /part- 
     { 
     uuid 
     . 
     uuid4 
     () 
     . 
     hex 
     } 
     .ndjson" 
     write_ndjson_to_s3 
     ( 
     bucket 
     , 
     s3_key 
     , 
     rows_batch 
     ) 
     total_records 
     += 
     len 
     ( 
     rows_batch 
     ) 
     rows_batch 
     = 
     [] 
     except 
     Exception 
     as 
     ex 
     : 
     print 
     ( 
     f 
     "Error processing entity 
     { 
     entity 
     } 
     : 
     { 
     str 
     ( 
     ex 
     ) 
     } 
     " 
     ) 
     continue 
     # Write remaining records 
     if 
     rows_batch 
     : 
     s3_key 
     = 
     f 
     " 
     { 
     prefix 
     } 
     / 
     { 
     entity 
     } 
     /year= 
     { 
     target_hour 
     . 
     year 
     : 
     04d 
     } 
     /month= 
     { 
     target_hour 
     . 
     month 
     : 
     02d 
     } 
     /day= 
     { 
     target_hour 
     . 
     day 
     : 
     02d 
     } 
     /hour= 
     { 
     target_hour 
     . 
     hour 
     : 
     02d 
     } 
     /part- 
     { 
     uuid 
     . 
     uuid4 
     () 
     . 
     hex 
     } 
     .ndjson" 
     write_ndjson_to_s3 
     ( 
     bucket 
     , 
     s3_key 
     , 
     rows_batch 
     ) 
     total_records 
     += 
     len 
     ( 
     rows_batch 
     ) 
     # Update state file 
     write_state_file 
     ( 
     bucket 
     , 
     state_key 
     , 
     target_hour 
     ) 
     return 
     { 
     "statusCode" 
     : 
     200 
     , 
     "body" 
     : 
     json 
     . 
     dumps 
     ({ 
     "success" 
     : 
     True 
     , 
     "hour_collected" 
     : 
     start_iso 
     , 
     "records_written" 
     : 
     total_records 
     , 
     "entities_processed" 
     : 
     entities 
     }) 
     } 
     
    
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the following environment variables, replacing with your values:

    Key Example value
    S3_BUCKET citrix-monitor-logs
    S3_PREFIX citrix_monitor
    STATE_KEY citrix_monitor/state.json
    CITRIX_CLIENT_ID your-client-id
    CITRIX_CLIENT_SECRET your-client-secret
    CITRIX_CUSTOMER_ID your-customer-id
    API_BASE https://api.cloud.com
    ENTITIES Machines,Sessions,Connections,Applications,Users
    PAGE_SIZE 1000
    LOOKBACK_MINUTES 75
    USE_TIME_FILTER true
  8. After the function is created, stay on its page (or open Lambda > Functions > CitrixMonitorCollector).

  9. Select the Configurationtab.

  10. In the General configurationpanel click Edit.

  11. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour )
    • Target: your Lambda function CitrixMonitorCollector
    • Name: CitrixMonitorCollector-1h
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. In the AWS Console. go to IAM > Users > Add users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: secops-reader
    • Access type: Access key — Programmatic access
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::citrix-monitor-logs/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::citrix-monitor-logs" 
      
     } 
      
     ] 
     } 
     
    
  7. Set the name to secops-reader-policy .

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV(these values are entered into the feed).

Configure a feed in Google SecOps to ingest Citrix Monitor Service logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, Citrix Monitor Service logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select Citrix Monitoras the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://citrix-monitor-logs/citrix_monitor/
    • Source deletion options: Select the deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default 180 Days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: