Collect Snyk group-level audit logs

Supported in:

This document explains how to ingest Snyk group-level audit logs to Google Security Operations using Amazon S3. The parser first cleans up unnecessary fields from the raw logs. Then, it extracts relevant information like user details, event type, and timestamps, transforming and mapping them into the Google SecOps UDM schema for standardized security log representation.

Before you begin

Make sure you have the following prerequisites:

  • Google SecOps instance
  • Privileged access to Snyk(Group Admin) and an API tokenwith access to the Group
  • Privileged access to AWS(S3, IAM, Lambda, EventBridge)

Collect Snyk Group level audit logs prerequisites (IDs, API keys, org IDs, tokens)

  1. In Snyk, click your avatar > Account settings > API token.
    • Click Revoke & regenerate(or Generate) and copy the token.
    • Save this token as the SNYK_API_TOKEN environment variable.
  2. In Snyk, switch to your Group (top-left switcher).
    • Go to Group settings. Copy the <GROUP_ID> from the URL: https://app.snyk.io/group/<GROUP_ID>/settings .
    • Or use REST API: GET https://api.snyk.io/rest/groups?version=2021-06-04 and pick the id .
  3. Ensure the token user has View Audit Logs (group.audit.read)permission.

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, snyk-audit ).
  3. Create a user following this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin the Access Keyssection.
  7. Select Third-party serviceas the Use case.
  8. Click Next.
  9. Optional: add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin the Permissions policiessection.
  15. Select Add permissions.
  16. Select Attach policies directly
  17. Search for and select the AmazonS3FullAccesspolicy.
  18. Click Next.
  19. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSON tab.
  2. Enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutSnykAuditObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
      
     "s3:PutObject" 
     , 
      
     "s3:GetObject" 
      
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::snyk-audit/*" 
      
     } 
      
     ] 
     } 
     
    
  3. Click Next > Create policy.

  4. Go to IAM > Roles > Create role > AWS service > Lambda.

  5. Attach the newly created policy.

  6. Name the role WriteSnykAuditToS3Role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:
Setting Value
Name snyk_group_audit_to_s3
Runtime Python 3.13
Architecture x86_64
Execution role WriteSnykAuditToS3Role
  1. After the function is created, open the Codetab, delete the stub and enter the following code ( snyk_group_audit_to_s3.py ):

      # snyk_group_audit_to_s3.py 
     #!/usr/bin/env python3 
     # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform) 
     import 
      
     os 
     import 
      
     json 
     import 
      
     time 
     import 
      
     urllib.parse 
     from 
      
     urllib.request 
      
     import 
     Request 
     , 
     urlopen 
     from 
      
     urllib.error 
      
     import 
     HTTPError 
     import 
      
     boto3 
     BASE 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "SNYK_API_BASE" 
     , 
     "https://api.snyk.io" 
     ) 
     . 
     rstrip 
     ( 
     "/" 
     ) 
     GROUP_ID 
     = 
     os 
     . 
     environ 
     [ 
     "SNYK_GROUP_ID" 
     ] 
     . 
     strip 
     () 
     API_TOKEN 
     = 
     os 
     . 
     environ 
     [ 
     "SNYK_API_TOKEN" 
     ] 
     . 
     strip 
     () 
     BUCKET 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     . 
     strip 
     () 
     PREFIX 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "snyk/audit/" 
     ) 
     . 
     strip 
     () 
     SIZE 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "SIZE" 
     , 
     "100" 
     )) 
     # max 100 per docs 
     MAX_PAGES 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "MAX_PAGES" 
     , 
     "20" 
     )) 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     , 
     "snyk/audit/state.json" 
     ) 
     API_VERSION 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "SNYK_API_VERSION" 
     , 
     "2021-06-04" 
     ) 
     . 
     strip 
     () 
     # required by REST API 
     LOOKBACK_SECONDS 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "LOOKBACK_SECONDS" 
     , 
     "3600" 
     )) 
     # used only when no cursor 
     # Optional filters 
     EVENTS_CSV 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "EVENTS" 
     , 
     "" 
     ) 
     . 
     strip 
     () 
     # e.g. "group.create,org.user.invited" 
     EXCLUDE_EVENTS_CSV 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "EXCLUDE_EVENTS" 
     , 
     "" 
     ) 
     . 
     strip 
     () 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     HDRS 
     = 
     { 
     # REST authentication requires "token" scheme and vnd.api+json Accept 
     "Authorization" 
     : 
     f 
     "token 
     { 
     API_TOKEN 
     } 
     " 
     , 
     "Accept" 
     : 
     "application/vnd.api+json" 
     , 
     } 
     def 
      
     _get_state 
     () 
     - 
    > str 
     | 
     None 
     : 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     ) 
     return 
     json 
     . 
     loads 
     ( 
     obj 
     [ 
     "Body" 
     ] 
     . 
     read 
     ()) 
     . 
     get 
     ( 
     "cursor" 
     ) 
     except 
     Exception 
     : 
     return 
     None 
     def 
      
     _put_state 
     ( 
     cursor 
     : 
     str 
     ): 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ({ 
     "cursor" 
     : 
     cursor 
     }) 
     . 
     encode 
     ( 
     "utf-8" 
     )) 
     def 
      
     _write 
     ( 
     payload 
     : 
     dict 
     ) 
     - 
    > str 
     : 
     ts 
     = 
     time 
     . 
     strftime 
     ( 
     "%Y/%m/ 
     %d 
     /%H%M%S" 
     , 
     time 
     . 
     gmtime 
     ()) 
     key 
     = 
     f 
     " 
     { 
     PREFIX 
     . 
     rstrip 
     ( 
     '/' 
     ) 
     } 
     / 
     { 
     ts 
     } 
     -snyk-group-audit.json" 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     payload 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     , 
     ) 
     return 
     key 
     def 
      
     _parse_next_cursor_from_links 
     ( 
     links 
     : 
     dict 
     | 
     None 
     ) 
     - 
    > str 
     | 
     None 
     : 
     if 
     not 
     links 
     : 
     return 
     None 
     nxt 
     = 
     links 
     . 
     get 
     ( 
     "next" 
     ) 
     if 
     not 
     nxt 
     : 
     return 
     None 
     try 
     : 
     q 
     = 
     urllib 
     . 
     parse 
     . 
     urlparse 
     ( 
     nxt 
     ) 
     . 
     query 
     params 
     = 
     urllib 
     . 
     parse 
     . 
     parse_qs 
     ( 
     q 
     ) 
     cur 
     = 
     params 
     . 
     get 
     ( 
     "cursor" 
     ) 
     return 
     cur 
     [ 
     0 
     ] 
     if 
     cur 
     else 
     None 
     except 
     Exception 
     : 
     return 
     None 
     def 
      
     _http_get 
     ( 
     url 
     : 
     str 
     ) 
     - 
    > dict 
     : 
     req 
     = 
     Request 
     ( 
     url 
     , 
     method 
     = 
     "GET" 
     , 
     headers 
     = 
     HDRS 
     ) 
     try 
     : 
     with 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     60 
     ) 
     as 
     r 
     : 
     return 
     json 
     . 
     loads 
     ( 
     r 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     except 
     HTTPError 
     as 
     e 
     : 
     # Back off on rate limit or transient server errors; single retry 
     if 
     e 
     . 
     code 
     in 
     ( 
     429 
     , 
     500 
     , 
     502 
     , 
     503 
     , 
     504 
     ): 
     delay 
     = 
     int 
     ( 
     e 
     . 
     headers 
     . 
     get 
     ( 
     "Retry-After" 
     , 
     "1" 
     )) 
     time 
     . 
     sleep 
     ( 
     max 
     ( 
     1 
     , 
     delay 
     )) 
     with 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     60 
     ) 
     as 
     r2 
     : 
     return 
     json 
     . 
     loads 
     ( 
     r2 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     raise 
     def 
      
     _as_list 
     ( 
     csv_str 
     : 
     str 
     ) 
     - 
    > list 
     [ 
     str 
     ]: 
     return 
     [ 
     x 
     . 
     strip 
     () 
     for 
     x 
     in 
     csv_str 
     . 
     split 
     ( 
     "," 
     ) 
     if 
     x 
     . 
     strip 
     ()] 
     def 
      
     fetch_page 
     ( 
     cursor 
     : 
     str 
     | 
     None 
     , 
     first_run_from_iso 
     : 
     str 
     | 
     None 
     ): 
     base_path 
     = 
     f 
     "/rest/groups/ 
     { 
     GROUP_ID 
     } 
     /audit_logs/search" 
     params 
     : 
     dict 
     [ 
     str 
     , 
     object 
     ] 
     = 
     { 
     "version" 
     : 
     API_VERSION 
     , 
     "size" 
     : 
     SIZE 
     , 
     } 
     if 
     cursor 
     : 
     params 
     [ 
     "cursor" 
     ] 
     = 
     cursor 
     elif 
     first_run_from_iso 
     : 
     params 
     [ 
     "from" 
     ] 
     = 
     first_run_from_iso 
     # RFC3339 
     events 
     = 
     _as_list 
     ( 
     EVENTS_CSV 
     ) 
     exclude_events 
     = 
     _as_list 
     ( 
     EXCLUDE_EVENTS_CSV 
     ) 
     if 
     events 
     and 
     exclude_events 
     : 
     # API does not allow both at the same time; prefer explicit include 
     exclude_events 
     = 
     [] 
     if 
     events 
     : 
     params 
     [ 
     "events" 
     ] 
     = 
     events 
     # will be encoded as repeated params 
     if 
     exclude_events 
     : 
     params 
     [ 
     "exclude_events" 
     ] 
     = 
     exclude_events 
     url 
     = 
     f 
     " 
     { 
     BASE 
     }{ 
     base_path 
     } 
     ? 
     { 
     urllib 
     . 
     parse 
     . 
     urlencode 
     ( 
     params 
     , 
      
     doseq 
     = 
     True 
     ) 
     } 
     " 
     return 
     _http_get 
     ( 
     url 
     ) 
     def 
      
     lambda_handler 
     ( 
     event 
     = 
     None 
     , 
     context 
     = 
     None 
     ): 
     cursor 
     = 
     _get_state 
     () 
     pages 
     = 
     0 
     total 
     = 
     0 
     last_cursor 
     = 
     cursor 
     # Only for the very first run (no saved cursor), constrain the time window 
     first_run_from_iso 
     = 
     None 
     if 
     not 
     cursor 
     and 
     LOOKBACK_SECONDS 
    > 0 
     : 
     first_run_from_iso 
     = 
     time 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     , 
     time 
     . 
     gmtime 
     ( 
     time 
     . 
     time 
     () 
     - 
     LOOKBACK_SECONDS 
     ) 
     ) 
     while 
     pages 
    < MAX_PAGES 
     : 
     payload 
     = 
     fetch_page 
     ( 
     cursor 
     , 
     first_run_from_iso 
     ) 
     _write 
     ( 
     payload 
     ) 
     # items are nested under data.items per Snyk docs 
     data_obj 
     = 
     payload 
     . 
     get 
     ( 
     "data" 
     ) 
     or 
     {} 
     items 
     = 
     data_obj 
     . 
     get 
     ( 
     "items" 
     ) 
     or 
     [] 
     if 
     isinstance 
     ( 
     items 
     , 
     list 
     ): 
     total 
     += 
     len 
     ( 
     items 
     ) 
     cursor 
     = 
     _parse_next_cursor_from_links 
     ( 
     payload 
     . 
     get 
     ( 
     "links" 
     )) 
     pages 
     += 
     1 
     if 
     not 
     cursor 
     : 
     break 
     # after first page, disable from-filter 
     first_run_from_iso 
     = 
     None 
     if 
     cursor 
     and 
     cursor 
     != 
     last_cursor 
     : 
     _put_state 
     ( 
     cursor 
     ) 
     return 
     { 
     "ok" 
     : 
     True 
     , 
     "pages" 
     : 
     pages 
     , 
     "events" 
     : 
     total 
     , 
     "next_cursor" 
     : 
     cursor 
     } 
     if 
     __name__ 
     == 
     "__main__" 
     : 
     print 
     ( 
     lambda_handler 
     ()) 
     
    

Add environment variables

  1. Go to Configuration > Environment variables.
  2. Click Edit > Add new environment variable.
  3. Enter the following environment variables, replacing with your values:

    Key Example
    S3_BUCKET snyk-audit
    S3_PREFIX snyk/audit/
    STATE_KEY snyk/audit/state.json
    SNYK_GROUP_ID <your_group_id>
    SNYK_API_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SNYK_API_BASE https://api.snyk.io (optional)
    SNYK_API_VERSION 2021-06-04
    SIZE 100
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    EVENTS (optional) group.create,org.user.add
    EXCLUDE_EVENTS (optional) api.access
  4. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  5. Select the Configurationtab.

  6. In the General configurationpanel click Edit.

  7. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: Your Lambda function.
    • Name: snyk-group-audit-1h .
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. In the AWS Console. go to IAM > Users > Add users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: secops-reader .
    • Access type: Access key — Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::snyk-audit/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::snyk-audit" 
      
     } 
      
     ] 
     } 
     
    
  7. Set the name to secops-reader-policy .

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV(these values are entered into the feed).

Configure a feed in Google SecOps to ingest Snyk Group level audit Logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, Snyk Group Audit Logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select Snyk Group level audit Logsas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://snyk-audit/snyk/audit/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: snyk.group_audit
    • Ingestion labels: Add if desired.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

UDM Mapping Table

Log Field UDM Mapping Logic
content.url
principal.url Directly mapped from the content.url field in the raw log.
created
metadata.event_timestamp Parsed from the created field in the raw log using the ISO8601 format.
event
metadata.product_event_type Directly mapped from the event field in the raw log.
groupId
principal.user.group_identifiers Directly mapped from the groupId field in the raw log.
orgId
principal.user.attribute.labels.key Set to "orgId".
orgId
principal.user.attribute.labels.value Directly mapped from the orgId field in the raw log.
userId
principal.user.userid Directly mapped from the userId field in the raw log.
N/A
metadata.event_type Hardcoded to "USER_UNCATEGORIZED" in the parser code.
N/A
metadata.log_type Hardcoded to "SNYK_SDLC" in the parser code.
N/A
metadata.product_name Hardcoded to "SNYK SDLC" in the parser code.
N/A
metadata.vendor_name Hardcoded to "SNYK_SDLC" in the parser code.

Need more help? Get answers from Community members and Google SecOps professionals.

Design a Mobile Site
View Site in Mobile | Classic
Share by: