Collect PingOne Advanced Identity Cloud logs

Supported in:

This document explains how to ingest PingOne Advanced Identity Cloud logs to Google Security Operations using Amazon S3.

Before you begin

  • Google SecOps instance
  • Privileged access to PingOne Advanced Identity Cloud tenant
  • Privileged access to AWS (S3, IAM, Lambda, EventBridge)

Get PingOne API key and tenant FQDN

  1. Sign in to the Advanced Identity Cloudadmin console.
  2. Click the user icon and then click Tenant Settings.
  3. On the Global Settingstab, click Log API Keys.
  4. Click New Log API Key, provide a name for the key.
  5. Click Create Key.
  6. Copy and save the api_key_idand api_key_secretvalues in a secure location. The api_key_secretvalue is not displayed again.
  7. Click Done
  8. Go to Tenant Settings > Details, and find your tenant FQDN(for example, example.tomcat.pingone.com ).

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, pingone-aic-logs ).
  3. Create a user following this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin the Access Keyssection.
  7. Select Third-party serviceas the Use case.
  8. Click Next.
  9. Optional: add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin the Permissions policiessection.
  15. Select Add permissions.
  16. Select Attach policies directly
  17. Search for and select the AmazonS3FullAccesspolicy.
  18. Click Next.
  19. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSON tab.
  2. Enter the following policy.

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutPingOneAICObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::pingone-aic-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::pingone-aic-logs/pingone-aic/logs/state.json" 
      
     } 
      
     ] 
     } 
     
    
    • Replace pingone-aic-logs if you entered a different bucket name.
  3. Click Next > Create policy.

  4. Go to IAM > Roles > Create role > AWS service > Lambda.

  5. Attach the newly created policy.

  6. Name the role WritePingOneAICToS3Role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name pingone_aic_to_s3
    Runtime Python 3.13
    Architecture x86_64
    Execution role WritePingOneAICToS3Role
  4. After the function is created, open the Codetab, delete the stub and enter the following code ( pingone_aic_to_s3.py ):

      #!/usr/bin/env python3 
     import 
      
     os 
     , 
      
     json 
     , 
      
     time 
     , 
      
     urllib.parse 
     from 
      
     urllib.request 
      
     import 
     Request 
     , 
     urlopen 
     from 
      
     urllib.error 
      
     import 
     HTTPError 
     , 
     URLError 
     import 
      
     boto3 
     FQDN 
     = 
     os 
     . 
     environ 
     [ 
     "AIC_TENANT_FQDN" 
     ] 
     . 
     strip 
     ( 
     "/" 
     ) 
     API_KEY_ID 
     = 
     os 
     . 
     environ 
     [ 
     "AIC_API_KEY_ID" 
     ] 
     API_KEY_SECRET 
     = 
     os 
     . 
     environ 
     [ 
     "AIC_API_SECRET" 
     ] 
     S3_BUCKET 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     S3_PREFIX 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "pingone-aic/logs/" 
     ) 
     . 
     strip 
     ( 
     "/" 
     ) 
     SOURCES 
     = 
     [ 
     s 
     . 
     strip 
     () 
     for 
     s 
     in 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "SOURCES" 
     , 
     "am-everything,idm-everything" 
     ) 
     . 
     split 
     ( 
     "," 
     ) 
     if 
     s 
     . 
     strip 
     ()] 
     PAGE_SIZE 
     = 
     min 
     ( 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "PAGE_SIZE" 
     , 
     "500" 
     )), 
     1000 
     ) 
     # hard cap per docs 
     MAX_PAGES 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "MAX_PAGES" 
     , 
     "20" 
     )) 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     , 
     "pingone-aic/logs/state.json" 
     ) 
     LOOKBACK_SECONDS 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "LOOKBACK_SECONDS" 
     , 
     "3600" 
     )) 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     def 
      
     _headers 
     (): 
     return 
     { 
     "x-api-key" 
     : 
     API_KEY_ID 
     , 
     "x-api-secret" 
     : 
     API_KEY_SECRET 
     } 
     def 
      
     _iso 
     ( 
     ts 
     : 
     float 
     ) 
     - 
    > str 
     : 
     return 
     time 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     , 
     time 
     . 
     gmtime 
     ( 
     ts 
     )) 
     def 
      
     _http_get 
     ( 
     url 
     : 
     str 
     , 
     timeout 
     : 
     int 
     = 
     60 
     , 
     max_retries 
     : 
     int 
     = 
     5 
     ) 
     - 
    > dict 
     : 
     attempt 
     , 
     backoff 
     = 
     0 
     , 
     1.0 
     while 
     True 
     : 
     req 
     = 
     Request 
     ( 
     url 
     , 
     method 
     = 
     "GET" 
     , 
     headers 
     = 
     _headers 
     ()) 
     try 
     : 
     with 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     timeout 
     ) 
     as 
     r 
     : 
     data 
     = 
     r 
     . 
     read 
     () 
     return 
     json 
     . 
     loads 
     ( 
     data 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     except 
     HTTPError 
     as 
     e 
     : 
     # 429: respect X-RateLimit-Reset (epoch seconds) if present 
     if 
     e 
     . 
     code 
     == 
     429 
     and 
     attempt 
    < max_retries 
     : 
     reset 
     = 
     e 
     . 
     headers 
     . 
     get 
     ( 
     "X-RateLimit-Reset" 
     ) 
     now 
     = 
     int 
     ( 
     time 
     . 
     time 
     ()) 
     delay 
     = 
     max 
     ( 
     1 
     , 
     int 
     ( 
     reset 
     ) 
     - 
     now 
     ) 
     if 
     ( 
     reset 
     and 
     reset 
     . 
     isdigit 
     ()) 
     else 
     int 
     ( 
     backoff 
     ) 
     time 
     . 
     sleep 
     ( 
     delay 
     ); 
     attempt 
     += 
     1 
     ; 
     backoff 
     *= 
     2 
     ; 
     continue 
     if 
     500 
    < = 
     e 
     . 
     code 
    < = 
     599 
     and 
     attempt 
    < max_retries 
     : 
     time 
     . 
     sleep 
     ( 
     backoff 
     ); 
     attempt 
     += 
     1 
     ; 
     backoff 
     *= 
     2 
     ; 
     continue 
     raise 
     except 
     URLError 
     : 
     if 
     attempt 
    < max_retries 
     : 
     time 
     . 
     sleep 
     ( 
     backoff 
     ); 
     attempt 
     += 
     1 
     ; 
     backoff 
     *= 
     2 
     ; 
     continue 
     raise 
     def 
      
     _load_state 
     () 
     - 
    > dict 
     : 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     ) 
     return 
     json 
     . 
     loads 
     ( 
     obj 
     [ 
     "Body" 
     ] 
     . 
     read 
     ()) 
     except 
     Exception 
     : 
     return 
     { 
     "sources" 
     : 
     {}} 
     def 
      
     _save_state 
     ( 
     state 
     : 
     dict 
     ): 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     ) 
     def 
      
     _write_page 
     ( 
     payload 
     : 
     dict 
     , 
     source 
     : 
     str 
     ) 
     - 
    > str 
     : 
     ts 
     = 
     time 
     . 
     gmtime 
     () 
     key 
     = 
     f 
     " 
     { 
     S3_PREFIX 
     } 
     / 
     { 
     time 
     . 
     strftime 
     ( 
     '%Y/%m/ 
     %d 
     /%H%M%S' 
     , 
      
     ts 
     ) 
     } 
     -pingone-aic- 
     { 
     source 
     } 
     .json" 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     payload 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     ) 
     return 
     key 
     def 
      
     _bounded_begin_time 
     ( 
     last_ts 
     : 
     str 
     | 
     None 
     , 
     now 
     : 
     float 
     ) 
     - 
    > str 
     : 
     # beginTime must be <= 24h before endTime (now if endTime omitted) 
     # if last_ts older than 24h → cap to now-24h; else use last_ts; else lookback 
     twenty_four_h_ago 
     = 
     now 
     - 
     24 
     * 
     3600 
     if 
     last_ts 
     : 
     try 
     : 
     t_struct 
     = 
     time 
     . 
     strptime 
     ( 
     last_ts 
     [: 
     19 
     ] 
     + 
     "Z" 
     , 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     ) 
     t_epoch 
     = 
     int 
     ( 
     time 
     . 
     mktime 
     ( 
     t_struct 
     )) 
     except 
     Exception 
     : 
     t_epoch 
     = 
     int 
     ( 
     now 
     - 
     LOOKBACK_SECONDS 
     ) 
     begin_epoch 
     = 
     max 
     ( 
     t_epoch 
     , 
     int 
     ( 
     twenty_four_h_ago 
     )) 
     else 
     : 
     begin_epoch 
     = 
     max 
     ( 
     int 
     ( 
     now 
     - 
     LOOKBACK_SECONDS 
     ), 
     int 
     ( 
     twenty_four_h_ago 
     )) 
     return 
     _iso 
     ( 
     begin_epoch 
     ) 
     def 
      
     fetch_source 
     ( 
     source 
     : 
     str 
     , 
     last_ts 
     : 
     str 
     | 
     None 
     ): 
     base 
     = 
     f 
     "https:// 
     { 
     FQDN 
     } 
     /monitoring/logs" 
     now 
     = 
     time 
     . 
     time 
     () 
     params 
     = 
     { 
     "source" 
     : 
     source 
     , 
     "_pageSize" 
     : 
     str 
     ( 
     PAGE_SIZE 
     ), 
     "_sortKeys" 
     : 
     "timestamp" 
     , 
     "beginTime" 
     : 
     _bounded_begin_time 
     ( 
     last_ts 
     , 
     now 
     ) 
     } 
     pages 
     = 
     0 
     written 
     = 
     0 
     newest_ts 
     = 
     last_ts 
     cookie 
     = 
     None 
     while 
     pages 
    < MAX_PAGES 
     : 
     if 
     cookie 
     : 
     params 
     [ 
     "_pagedResultsCookie" 
     ] 
     = 
     cookie 
     qs 
     = 
     urllib 
     . 
     parse 
     . 
     urlencode 
     ( 
     params 
     , 
     quote_via 
     = 
     urllib 
     . 
     parse 
     . 
     quote 
     ) 
     data 
     = 
     _http_get 
     ( 
     f 
     " 
     { 
     base 
     } 
     ? 
     { 
     qs 
     } 
     " 
     ) 
     _write_page 
     ( 
     data 
     , 
     source 
     ) 
     results 
     = 
     data 
     . 
     get 
     ( 
     "result" 
     ) 
     or 
     data 
     . 
     get 
     ( 
     "results" 
     ) 
     or 
     [] 
     for 
     item 
     in 
     results 
     : 
     t 
     = 
     item 
     . 
     get 
     ( 
     "timestamp" 
     ) 
     or 
     item 
     . 
     get 
     ( 
     "payload" 
     , 
     {}) 
     . 
     get 
     ( 
     "timestamp" 
     ) 
     if 
     t 
     and 
     ( 
     newest_ts 
     is 
     None 
     or 
     t 
    > newest_ts 
     ): 
     newest_ts 
     = 
     t 
     written 
     += 
     len 
     ( 
     results 
     ) 
     cookie 
     = 
     data 
     . 
     get 
     ( 
     "pagedResultsCookie" 
     ) 
     pages 
     += 
     1 
     if 
     not 
     cookie 
     : 
     break 
     return 
     { 
     "source" 
     : 
     source 
     , 
     "pages" 
     : 
     pages 
     , 
     "written" 
     : 
     written 
     , 
     "newest_ts" 
     : 
     newest_ts 
     } 
     def 
      
     lambda_handler 
     ( 
     event 
     = 
     None 
     , 
     context 
     = 
     None 
     ): 
     state 
     = 
     _load_state 
     () 
     state 
     . 
     setdefault 
     ( 
     "sources" 
     , 
     {}) 
     summary 
     = 
     [] 
     for 
     source 
     in 
     SOURCES 
     : 
     last_ts 
     = 
     state 
     [ 
     "sources" 
     ] 
     . 
     get 
     ( 
     source 
     , 
     {}) 
     . 
     get 
     ( 
     "last_ts" 
     ) 
     res 
     = 
     fetch_source 
     ( 
     source 
     , 
     last_ts 
     ) 
     if 
     res 
     . 
     get 
     ( 
     "newest_ts" 
     ): 
     state 
     [ 
     "sources" 
     ][ 
     source 
     ] 
     = 
     { 
     "last_ts" 
     : 
     res 
     [ 
     "newest_ts" 
     ]} 
     summary 
     . 
     append 
     ( 
     res 
     ) 
     _save_state 
     ( 
     state 
     ) 
     return 
     { 
     "ok" 
     : 
     True 
     , 
     "summary" 
     : 
     summary 
     } 
     if 
     __name__ 
     == 
     "__main__" 
     : 
     print 
     ( 
     lambda_handler 
     ()) 
     
    
  5. Go to Configuration > Environment variables > Edit > Add new environment variable.

  6. Enter the following environment variables, replacing with your values:

    Key Example
    S3_BUCKET pingone-aic-logs
    S3_PREFIX pingone-aic/logs/
    STATE_KEY pingone-aic/logs/state.json
    AIC_TENANT_FQDN example.tomcat.pingone.com
    AIC_API_KEY_ID <api_key_id>
    AIC_API_SECRET <api_key_secret>
    SOURCES am-everything,idm-everything
    PAGE_SIZE 500
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  7. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  8. Select the Configurationtab.

  9. In the General configurationpanel click Edit.

  10. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: your Lambda function.
    • Name: pingone-aic-1h .
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. In the AWS Console, go to IAM > Users, then click Add users.
  2. Provide the following configuration details:
    • User: Enter a unique name (for example, secops-reader )
    • Access type: Select Access key - Programmatic access
    • Click Create user.
  3. Attach minimal read policy (custom): Users > select secops-reader > Permissions > Add permissions > Attach policies directly > Create policy
  4. In the JSON editor, enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::<your-bucket>/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::<your-bucket>" 
      
     } 
      
     ] 
     } 
     
    
  5. Set the name to secops-reader-policy .

  6. Go to Create policy > search/select > Next > Add permissions.

  7. Go to Security credentials > Access keys > Create access key.

  8. Download the CSV(these values are entered into the feed).

Configure a feed in Google SecOps to ingest PingOne Advanced Identity Cloud logs

  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, PingOne Advanced Identity Cloud ).
  4. Select Amazon S3 V2as the Source type.
  5. Select PingOne Advanced Identity Cloudas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://pingone-aic-logs/pingone-aic/logs/
    • Source deletion options: Select the deletion option according to your preference.
    • Maximum File Age: Default 180 Days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label to be applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: