Collect Duo administrator logs

Supported in:

This document explains how to ingest Duo administrator logs to Google Security Operations using Amazon S3. The parser extracts fields from the logs (JSON format) and maps them to the Unified Data Model (UDM). It handles various Duo action types (login, user management, group management) differently, populating relevant UDM fields based on the action and available data, including user details, authentication factors, and security results. It also performs data transformations, such as merging IP addresses, converting timestamps, and handling errors.

Before you begin

  • Google SecOps instance
  • Privileged access to Duo tenant (Admin API application)
  • Privileged access to AWS (S3, IAM, Lambda, EventBridge)

Configure Duo Admin API application

  1. Sign in to Duo Admin Panel.
  2. Go to Applications > Application Catalog.
  3. Add Admin APIapplication.
  4. Record the following values:
    • Integration key (ikey)
    • Secret key (skey)
    • API hostname(for example, api-XXXXXXXX.duosecurity.com )
  5. In Permissions, enable Grant read log(to read administrator logs).
  6. Save the application.

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, duo-admin-logs ).
  3. Create a user following this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin the Access Keyssection.
  7. Select Third-party serviceas the Use case.
  8. Click Next.
  9. Optional: add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin the Permissions policiessection.
  15. Select Add permissions.
  16. Select Attach policies directly
  17. Search for and select the AmazonS3FullAccesspolicy.
  18. Click Next.
  19. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. Go to AWS console > IAM > Policies > Create policy > JSON tab.
  2. Enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutDuoAdminObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::duo-admin-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::duo-admin-logs/duo/admin/state.json" 
      
     } 
      
     ] 
     } 
     
    
    • Replace duo-admin-logs if you entered a different bucket name:
  3. Click Next > Create policy.

  4. Go to IAM > Roles > Create role > AWS service > Lambda.

  5. Attach the newly created policy.

  6. Name the role WriteDuoAdminToS3Role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name duo_admin_to_s3
    Runtime Python 3.13
    Architecture x86_64
    Execution role WriteDuoAdminToS3Role
  4. After the function is created, open the Codetab, delete the stub and enter the following code ( duo_admin_to_s3.py ):

      #!/usr/bin/env python3 
     # Lambda: Pull Duo Admin API v1 Administrator Logs to S3 (raw JSON pages) 
     import 
      
     os 
     , 
      
     json 
     , 
      
     time 
     , 
      
     hmac 
     , 
      
     hashlib 
     , 
      
     base64 
     , 
      
     email.utils 
     , 
      
     urllib.parse 
     from 
      
     urllib.request 
      
     import 
     Request 
     , 
     urlopen 
     from 
      
     urllib.error 
      
     import 
     HTTPError 
     , 
     URLError 
     from 
      
     datetime 
      
     import 
     datetime 
     import 
      
     boto3 
     DUO_IKEY 
     = 
     os 
     . 
     environ 
     [ 
     "DUO_IKEY" 
     ] 
     DUO_SKEY 
     = 
     os 
     . 
     environ 
     [ 
     "DUO_SKEY" 
     ] 
     DUO_API_HOSTNAME 
     = 
     os 
     . 
     environ 
     [ 
     "DUO_API_HOSTNAME" 
     ] 
     . 
     strip 
     () 
     S3_BUCKET 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     S3_PREFIX 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "duo/admin/" 
     ) 
     . 
     strip 
     ( 
     "/" 
     ) 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     , 
     "duo/admin/state.json" 
     ) 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     def 
      
     _canon_params 
     ( 
     params 
     : 
     dict 
     ) 
     - 
    > str 
     : 
     parts 
     = 
     [] 
     for 
     k 
     in 
     sorted 
     ( 
     params 
     . 
     keys 
     ()): 
     v 
     = 
     params 
     [ 
     k 
     ] 
     if 
     v 
     is 
     None 
     : 
     continue 
     parts 
     . 
     append 
     ( 
     f 
     " 
     { 
     urllib 
     . 
     parse 
     . 
     quote 
     ( 
     str 
     ( 
     k 
     ), 
      
     '~' 
     ) 
     } 
     = 
     { 
     urllib 
     . 
     parse 
     . 
     quote 
     ( 
     str 
     ( 
     v 
     ), 
      
     '~' 
     ) 
     } 
     " 
     ) 
     return 
     "&" 
     . 
     join 
     ( 
     parts 
     ) 
     def 
      
     _sign 
     ( 
     method 
     : 
     str 
     , 
     host 
     : 
     str 
     , 
     path 
     : 
     str 
     , 
     params 
     : 
     dict 
     ) 
     - 
    > dict 
     : 
     now 
     = 
     email 
     . 
     utils 
     . 
     formatdate 
     () 
     canon 
     = 
     " 
     \n 
     " 
     . 
     join 
     ([ 
     now 
     , 
     method 
     . 
     upper 
     (), 
     host 
     . 
     lower 
     (), 
     path 
     , 
     _canon_params 
     ( 
     params 
     )]) 
     sig 
     = 
     hmac 
     . 
     new 
     ( 
     DUO_SKEY 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     canon 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     hashlib 
     . 
     sha1 
     ) 
     . 
     hexdigest 
     () 
     auth 
     = 
     base64 
     . 
     b64encode 
     ( 
     f 
     " 
     { 
     DUO_IKEY 
     } 
     : 
     { 
     sig 
     } 
     " 
     . 
     encode 
     ()) 
     . 
     decode 
     () 
     return 
     { 
     "Date" 
     : 
     now 
     , 
     "Authorization" 
     : 
     f 
     "Basic 
     { 
     auth 
     } 
     " 
     } 
     def 
      
     _http 
     ( 
     method 
     : 
     str 
     , 
     path 
     : 
     str 
     , 
     params 
     : 
     dict 
     , 
     timeout 
     : 
     int 
     = 
     60 
     , 
     max_retries 
     : 
     int 
     = 
     5 
     ) 
     - 
    > dict 
     : 
     host 
     = 
     DUO_API_HOSTNAME 
     assert 
     host 
     . 
     startswith 
     ( 
     "api-" 
     ) 
     and 
     host 
     . 
     endswith 
     ( 
     ".duosecurity.com" 
     ), 
    \ "DUO_API_HOSTNAME must be like api-XXXXXXXX.duosecurity.com" 
     qs 
     = 
     _canon_params 
     ( 
     params 
     ) 
     url 
     = 
     f 
     "https:// 
     { 
     host 
     }{ 
     path 
     } 
     " 
     + 
     ( 
     f 
     "? 
     { 
     qs 
     } 
     " 
     if 
     qs 
     else 
     "" 
     ) 
     attempt 
     , 
     backoff 
     = 
     0 
     , 
     1.0 
     while 
     True 
     : 
     req 
     = 
     Request 
     ( 
     url 
     , 
     method 
     = 
     method 
     . 
     upper 
     ()) 
     hdrs 
     = 
     _sign 
     ( 
     method 
     , 
     host 
     , 
     path 
     , 
     params 
     ) 
     req 
     . 
     add_header 
     ( 
     "Accept" 
     , 
     "application/json" 
     ) 
     for 
     k 
     , 
     v 
     in 
     hdrs 
     . 
     items 
     (): 
     req 
     . 
     add_header 
     ( 
     k 
     , 
     v 
     ) 
     try 
     : 
     with 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     timeout 
     ) 
     as 
     r 
     : 
     return 
     json 
     . 
     loads 
     ( 
     r 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     except 
     HTTPError 
     as 
     e 
     : 
     # 429 or 5xx → exponential backoff 
     if 
     ( 
     e 
     . 
     code 
     == 
     429 
     or 
     500 
    < = 
     e 
     . 
     code 
    < = 
     599 
     ) 
     and 
     attempt 
    < max_retries 
     : 
     time 
     . 
     sleep 
     ( 
     backoff 
     ) 
     attempt 
     += 
     1 
     backoff 
     *= 
     2 
     continue 
     raise 
     except 
     URLError 
     : 
     if 
     attempt 
    < max_retries 
     : 
     time 
     . 
     sleep 
     ( 
     backoff 
     ) 
     attempt 
     += 
     1 
     backoff 
     *= 
     2 
     continue 
     raise 
     def 
      
     _read_state 
     () 
     - 
    > int 
     | 
     None 
     : 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     ) 
     return 
     int 
     ( 
     json 
     . 
     loads 
     ( 
     obj 
     [ 
     "Body" 
     ] 
     . 
     read 
     ()) 
     . 
     get 
     ( 
     "mintime" 
     )) 
     except 
     Exception 
     : 
     return 
     None 
     def 
      
     _write_state 
     ( 
     mintime 
     : 
     int 
     ): 
     body 
     = 
     json 
     . 
     dumps 
     ({ 
     "mintime" 
     : 
     mintime 
     }) 
     . 
     encode 
     ( 
     "utf-8" 
     ) 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     , 
     Body 
     = 
     body 
     , 
     ContentType 
     = 
     "application/json" 
     ) 
     def 
      
     _epoch_from_item 
     ( 
     item 
     : 
     dict 
     ) 
     - 
    > int 
     | 
     None 
     : 
     # Prefer numeric 'timestamp' (seconds); fallback to ISO8601 'ts' 
     ts_num 
     = 
     item 
     . 
     get 
     ( 
     "timestamp" 
     ) 
     if 
     isinstance 
     ( 
     ts_num 
     , 
     ( 
     int 
     , 
     float 
     )): 
     return 
     int 
     ( 
     ts_num 
     ) 
     ts_iso 
     = 
     item 
     . 
     get 
     ( 
     "ts" 
     ) 
     if 
     isinstance 
     ( 
     ts_iso 
     , 
     str 
     ): 
     try 
     : 
     # Accept "...Z" or with offset 
     return 
     int 
     ( 
     datetime 
     . 
     fromisoformat 
     ( 
     ts_iso 
     . 
     replace 
     ( 
     "Z" 
     , 
     "+00:00" 
     )) 
     . 
     timestamp 
     ()) 
     except 
     Exception 
     : 
     return 
     None 
     return 
     None 
     def 
      
     _write_page 
     ( 
     payload 
     : 
     dict 
     , 
     when 
     : 
     int 
     , 
     page 
     : 
     int 
     ) 
     - 
    > str 
     : 
     key 
     = 
     f 
     " 
     { 
     S3_PREFIX 
     } 
     / 
     { 
     time 
     . 
     strftime 
     ( 
     '%Y/%m/ 
     %d 
     ' 
     , 
      
     time 
     . 
     gmtime 
     ( 
     when 
     )) 
     } 
     /duo-admin- 
     { 
     page 
     : 
     05d 
     } 
     .json" 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     payload 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     , 
     ) 
     return 
     key 
     def 
      
     fetch_and_store 
     (): 
     now 
     = 
     int 
     ( 
     time 
     . 
     time 
     ()) 
     # Start from last checkpoint or now-3600 on first run 
     mintime 
     = 
     _read_state 
     () 
     or 
     ( 
     now 
     - 
     3600 
     ) 
     page 
     = 
     0 
     total 
     = 
     0 
     next_mintime 
     = 
     mintime 
     max_seen_ts 
     = 
     mintime 
     while 
     True 
     : 
     data 
     = 
     _http 
     ( 
     "GET" 
     , 
     "/admin/v1/logs/administrator" 
     , 
     { 
     "mintime" 
     : 
     mintime 
     }) 
     _write_page 
     ( 
     data 
     , 
     now 
     , 
     page 
     ) 
     page 
     += 
     1 
     # Extract items 
     resp 
     = 
     data 
     . 
     get 
     ( 
     "response" 
     ) 
     items 
     = 
     resp 
     if 
     isinstance 
     ( 
     resp 
     , 
     list 
     ) 
     else 
     ( 
     resp 
     . 
     get 
     ( 
     "items" 
     ) 
     if 
     isinstance 
     ( 
     resp 
     , 
     dict 
     ) 
     else 
     []) 
     items 
     = 
     items 
     or 
     [] 
     if 
     not 
     items 
     : 
     break 
     total 
     += 
     len 
     ( 
     items 
     ) 
     # Track the newest timestamp in this batch 
     for 
     it 
     in 
     items 
     : 
     ts 
     = 
     _epoch_from_item 
     ( 
     it 
     ) 
     if 
     ts 
     and 
     ts 
    > max_seen_ts 
     : 
     max_seen_ts 
     = 
     ts 
     # Duo returns only the 1000 earliest events; page by advancing mintime 
     if 
     len 
     ( 
     items 
     ) 
    > = 
     1000 
     and 
     max_seen_ts 
    > = 
     mintime 
     : 
     mintime 
     = 
     max_seen_ts 
     next_mintime 
     = 
     max_seen_ts 
     continue 
     else 
     : 
     break 
     # Save checkpoint: newest seen ts, or "now" if nothing new 
     if 
     max_seen_ts 
    > next_mintime 
     : 
     _write_state 
     ( 
     max_seen_ts 
     ) 
     next_state 
     = 
     max_seen_ts 
     else 
     : 
     _write_state 
     ( 
     now 
     ) 
     next_state 
     = 
     now 
     return 
     { 
     "ok" 
     : 
     True 
     , 
     "pages" 
     : 
     page 
     , 
     "events" 
     : 
     total 
     , 
     "next_mintime" 
     : 
     next_state 
     } 
     def 
      
     lambda_handler 
     ( 
     event 
     = 
     None 
     , 
     context 
     = 
     None 
     ): 
     return 
     fetch_and_store 
     () 
     if 
     __name__ 
     == 
     "__main__" 
     : 
     print 
     ( 
     lambda_handler 
     ()) 
     
    
  5. Go to Configuration > Environment variables > Edit > Add new environment variable.

  6. Enter the following environment variables, replacing with your values.

    Key Example
    S3_BUCKET duo-admin-logs
    S3_PREFIX duo/admin/
    STATE_KEY duo/admin/state.json
    DUO_IKEY DIXYZ...
    DUO_SKEY ****************
    DUO_API_HOSTNAME api-XXXXXXXX.duosecurity.com
  7. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  8. Select the Configurationtab.

  9. In the General configurationpanel click Edit.

  10. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: your Lambda function.
    • Name: duo-admin-1h .
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. In the AWS Console, go to IAM > Users, then click Add users.
  2. Provide the following configuration details:
    • User: Enter a unique name (for example, secops-reader )
    • Access type: Select Access key - Programmatic access
    • Click Create user.
  3. Attach minimal read policy (custom): Users > select secops-reader > Permissions > Add permissions > Attach policies directly > Create policy
  4. In the JSON editor, enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::<your-bucket>/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::<your-bucket>" 
      
     } 
      
     ] 
     } 
     
    
  5. Set the name to secops-reader-policy .

  6. Go to Create policy > search/select > Next > Add permissions.

  7. Go to Security credentials > Access keys > Create access key.

  8. Download the CSV(these values are entered into the feed).

Configure a feed in Google SecOps to ingest Duo Administrator Logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, Duo Administrator Logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select Duo Administrator Logsas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://duo-admin-logs/duo/admin/
    • Source deletion options: Select the deletion option according to your preference.
    • Maximum File Age: Default 180 Days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: the asset namespace .
    • Ingestion labels: the label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

UDM Mapping Table

Log Field UDM Mapping Logic
action
metadata.product_event_type The value of the action field from the raw log.
desc
metadata.description The value of the desc field from the raw log's description object.
description._status
target.group.attribute.labels.value The value of the _status field within the description object from the raw log, specifically when processing group-related actions. This value is placed within a "labels" array with a corresponding "key" of "status".
description.desc
metadata.description The value of the desc field from the raw log's description object.
description.email
target.user.email_addresses The value of the email field from the raw log's description object.
description.error
security_result.summary The value of the error field from the raw log's description object.
description.factor
extensions.auth.auth_details The value of the factor field from the raw log's description object.
description.groups.0._status
target.group.attribute.labels.value The value of the _status field from the first element in the groups array within the raw log's description object. This value is placed within a "labels" array with a corresponding "key" of "status".
description.groups.0.name
target.group.group_display_name The value of the name field from the first element in the groups array within the raw log's description object.
description.ip_address
principal.ip The value of the ip_address field from the raw log's description object.
description.name
target.group.group_display_name The value of the name field from the raw log's description object.
description.realname
target.user.user_display_name The value of the realname field from the raw log's description object.
description.status
target.user.attribute.labels.value The value of the status field from the raw log's description object. This value is placed within a "labels" array with a corresponding "key" of "status".
description.uname
target.user.email_addresses or target.user.userid The value of the uname field from the raw log's description object. If it matches an email address format, it's mapped to email_addresses ; otherwise, it's mapped to userid .
host
principal.hostname The value of the host field from the raw log.
isotimestamp
metadata.event_timestamp.seconds The value of the isotimestamp field from the raw log, converted to epoch seconds.
object
target.group.group_display_name The value of the object field from the raw log.
timestamp
metadata.event_timestamp.seconds The value of the timestamp field from the raw log.
username
target.user.userid or principal.user.userid If the action field contains "login", the value is mapped to target.user.userid . Otherwise, it's mapped to principal.user.userid . Set to "USERNAME_PASSWORD" if the action field contains "login". Determined by the parser based on the action field. Possible values: USER_LOGIN , GROUP_CREATION , USER_UNCATEGORIZED , GROUP_DELETION , USER_CREATION , GROUP_MODIFICATION , GENERIC_EVENT . Always set to "DUO_ADMIN". Always set to "MULTI-FACTOR_AUTHENTICATION". Always set to "DUO_SECURITY". Set to "ADMINISTRATOR" if the eventtype field contains "admin". Determined by the parser based on the action field. Set to "BLOCK" if the action field contains "error"; otherwise, set to "ALLOW". Always set to "status" when populating target.group.attribute.labels . Always set to "status" when populating target.user.attribute.labels .

Need more help? Get answers from Community members and Google SecOps professionals.

Design a Mobile Site
View Site in Mobile | Classic
Share by: