Collect Snyk group-level audit and issues logs

Supported in:

This guide explains how you can ingest Snyk group-level audit and issues logs to Google Security Operations using Amazon S3.

Before you begin

Make sure you have the following prerequisites:

  • Google SecOps instance
  • Privileged access to SnykGroup (API token with read access; Group ID)
  • Privileged access to AWS(S3, IAM, Lambda, EventBridge)

Get Snyk Group ID and API token

  1. In SnykUI, go to Account settings > API tokenand generate the API token.
  2. Copy and save the token in a secure location to later use as SNYK_TOKEN .
  3. Switch to your Groupand open Group settings.
  4. Copy and save the Group IDfrom the URL ( https://app.snyk.io/group/<GROUP_ID>/... ) to later use as GROUP_ID .
  5. Base API endpoint: https://api.snyk.io (override with API_BASE if required).
  6. Assign Group Adminrole to the user with the token. (The user must be able to view Group Audit Logsand Group Issues).

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, snyk-group-logs ).
  3. Create a user following this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin the Access Keyssection.
  7. Select Third-party serviceas the Use case.
  8. Click Next.
  9. Optional: add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin the Permissions policiessection.
  15. Select Add permissions.
  16. Select Attach policies directly
  17. Search for and select the AmazonS3FullAccesspolicy.
  18. Click Next.
  19. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSON tab.
  2. Enter the following policy (includes write access for all objects under the bucket andread access to the state file your Lambda uses):

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "PutAllSnykGroupObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:PutObject" 
     , 
      
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::snyk-group-logs/*" 
      
     } 
      
     ] 
     } 
     
    
    • Replace snyk-group-logs if you entered a different bucket name.
  3. Click Next > Create policy.

  4. Go to IAM > Roles > Create role > AWS service > Lambda.

  5. Attach the newly created policy.

  6. Name the role WriteSnykGroupToS3Role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:
Setting Value
Name snyk_group_audit_issues_to_s3
Runtime Python 3.13
Architecture x86_64
Execution role WriteSnykGroupToS3Role
  1. After the function is created, open the Codetab, delete the stub and enter the following code ( snyk_group_audit_issues_to_s3.py ):

      #!/usr/bin/env python3 
     # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform) 
     import 
      
     os 
     import 
      
     json 
     import 
      
     time 
     import 
      
     urllib.parse 
     from 
      
     urllib.request 
      
     import 
     Request 
     , 
     urlopen 
     from 
      
     urllib.parse 
      
     import 
     urlparse 
     , 
     parse_qs 
     from 
      
     urllib.error 
      
     import 
     HTTPError 
     import 
      
     boto3 
     API_BASE 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "API_BASE" 
     , 
     "https://api.snyk.io" 
     ) 
     . 
     rstrip 
     ( 
     "/" 
     ) 
     SNYK_TOKEN 
     = 
     os 
     . 
     environ 
     [ 
     "SNYK_TOKEN" 
     ] 
     . 
     strip 
     () 
     GROUP_ID 
     = 
     os 
     . 
     environ 
     [ 
     "GROUP_ID" 
     ] 
     . 
     strip 
     () 
     BUCKET 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     . 
     strip 
     () 
     PREFIX 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "snyk/group/" 
     ) 
     . 
     strip 
     () 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     , 
     "snyk/group/state.json" 
     ) 
     . 
     strip 
     () 
     # Page sizes & limits 
     AUDIT_SIZE 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "AUDIT_PAGE_SIZE" 
     , 
     "100" 
     )) 
     # audit uses 'size' (max 100) 
     ISSUES_LIMIT 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "ISSUES_PAGE_LIMIT" 
     , 
     "200" 
     )) 
     # issues uses 'limit' 
     MAX_PAGES 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "MAX_PAGES" 
     , 
     "20" 
     )) 
     # API versions (Snyk REST requires a 'version' param) 
     AUDIT_API_VERSION 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "SNYK_AUDIT_API_VERSION" 
     , 
     "2021-06-04" 
     ) 
     . 
     strip 
     () 
     ISSUES_API_VERSION 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "SNYK_ISSUES_API_VERSION" 
     , 
     "2024-10-15" 
     ) 
     . 
     strip 
     () 
     # First-run lookback for audit to avoid huge backfills 
     LOOKBACK_SECONDS 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "LOOKBACK_SECONDS" 
     , 
     "3600" 
     )) 
     HDRS 
     = 
     { 
     "Authorization" 
     : 
     f 
     "token 
     { 
     SNYK_TOKEN 
     } 
     " 
     , 
     "Accept" 
     : 
     "application/vnd.api+json" 
     , 
     } 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     def 
      
     _get_state 
     () 
     - 
    > dict 
     : 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     ) 
     return 
     json 
     . 
     loads 
     ( 
     obj 
     [ 
     "Body" 
     ] 
     . 
     read 
     () 
     or 
     b 
     " 
     {} 
     " 
     ) 
     except 
     Exception 
     : 
     return 
     {} 
     def 
      
     _put_state 
     ( 
     state 
     : 
     dict 
     ): 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     , 
     ) 
     def 
      
     _iso 
     ( 
     ts 
     : 
     float 
     ) 
     - 
    > str 
     : 
     return 
     time 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     , 
     time 
     . 
     gmtime 
     ( 
     ts 
     )) 
     def 
      
     _http_get 
     ( 
     url 
     : 
     str 
     ) 
     - 
    > dict 
     : 
     req 
     = 
     Request 
     ( 
     url 
     , 
     method 
     = 
     "GET" 
     , 
     headers 
     = 
     HDRS 
     ) 
     try 
     : 
     with 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     60 
     ) 
     as 
     r 
     : 
     return 
     json 
     . 
     loads 
     ( 
     r 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     except 
     HTTPError 
     as 
     e 
     : 
     if 
     e 
     . 
     code 
     in 
     ( 
     429 
     , 
     500 
     , 
     502 
     , 
     503 
     , 
     504 
     ): 
     delay 
     = 
     int 
     ( 
     e 
     . 
     headers 
     . 
     get 
     ( 
     "Retry-After" 
     , 
     "1" 
     )) 
     time 
     . 
     sleep 
     ( 
     max 
     ( 
     1 
     , 
     delay 
     )) 
     with 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     60 
     ) 
     as 
     r2 
     : 
     return 
     json 
     . 
     loads 
     ( 
     r2 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     raise 
     def 
      
     _write_page 
     ( 
     kind 
     : 
     str 
     , 
     payload 
     : 
     dict 
     ) 
     - 
    > str 
     : 
     ts 
     = 
     time 
     . 
     gmtime 
     () 
     key 
     = 
     f 
     " 
     { 
     PREFIX 
     . 
     rstrip 
     ( 
     '/' 
     ) 
     } 
     / 
     { 
     time 
     . 
     strftime 
     ( 
     '%Y/%m/ 
     %d 
     /%H%M%S' 
     , 
      
     ts 
     ) 
     } 
     -snyk- 
     { 
     kind 
     } 
     .json" 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     payload 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     , 
     ) 
     return 
     key 
     def 
      
     _next_href 
     ( 
     links 
     : 
     dict 
     | 
     None 
     ) 
     - 
    > str 
     | 
     None 
     : 
     if 
     not 
     links 
     : 
     return 
     None 
     nxt 
     = 
     links 
     . 
     get 
     ( 
     "next" 
     ) 
     if 
     not 
     nxt 
     : 
     return 
     None 
     if 
     isinstance 
     ( 
     nxt 
     , 
     str 
     ): 
     return 
     nxt 
     if 
     isinstance 
     ( 
     nxt 
     , 
     dict 
     ): 
     return 
     nxt 
     . 
     get 
     ( 
     "href" 
     ) 
     return 
     None 
     # -------- Audit Logs -------- 
     def 
      
     pull_audit_logs 
     ( 
     state 
     : 
     dict 
     ) 
     - 
    > dict 
     : 
     cursor 
     = 
     state 
     . 
     get 
     ( 
     "audit_cursor" 
     ) 
     pages 
     = 
     0 
     total 
     = 
     0 
     base 
     = 
     f 
     " 
     { 
     API_BASE 
     } 
     /rest/groups/ 
     { 
     GROUP_ID 
     } 
     /audit_logs/search" 
     params 
     : 
     dict 
     [ 
     str 
     , 
     object 
     ] 
     = 
     { 
     "version" 
     : 
     AUDIT_API_VERSION 
     , 
     "size" 
     : 
     AUDIT_SIZE 
     } 
     if 
     cursor 
     : 
     params 
     [ 
     "cursor" 
     ] 
     = 
     cursor 
     else 
     : 
     now 
     = 
     time 
     . 
     time 
     () 
     params 
     [ 
     "from" 
     ] 
     = 
     _iso 
     ( 
     now 
     - 
     LOOKBACK_SECONDS 
     ) 
     params 
     [ 
     "to" 
     ] 
     = 
     _iso 
     ( 
     now 
     ) 
     while 
     pages 
    < MAX_PAGES 
     : 
     url 
     = 
     f 
     " 
     { 
     base 
     } 
     ? 
     { 
     urllib 
     . 
     parse 
     . 
     urlencode 
     ( 
     params 
     , 
      
     doseq 
     = 
     True 
     ) 
     } 
     " 
     payload 
     = 
     _http_get 
     ( 
     url 
     ) 
     _write_page 
     ( 
     "audit" 
     , 
     payload 
     ) 
     data_items 
     = 
     ( 
     payload 
     . 
     get 
     ( 
     "data" 
     ) 
     or 
     {}) 
     . 
     get 
     ( 
     "items" 
     ) 
     or 
     [] 
     if 
     isinstance 
     ( 
     data_items 
     , 
     list 
     ): 
     total 
     += 
     len 
     ( 
     data_items 
     ) 
     nxt 
     = 
     _next_href 
     ( 
     payload 
     . 
     get 
     ( 
     "links" 
     )) 
     if 
     not 
     nxt 
     : 
     break 
     q 
     = 
     parse_qs 
     ( 
     urlparse 
     ( 
     nxt 
     ) 
     . 
     query 
     ) 
     cur 
     = 
     ( 
     q 
     . 
     get 
     ( 
     "cursor" 
     ) 
     or 
     [ 
     None 
     ])[ 
     0 
     ] 
     if 
     not 
     cur 
     : 
     break 
     params 
     = 
     { 
     "version" 
     : 
     AUDIT_API_VERSION 
     , 
     "size" 
     : 
     AUDIT_SIZE 
     , 
     "cursor" 
     : 
     cur 
     } 
     state 
     [ 
     "audit_cursor" 
     ] 
     = 
     cur 
     pages 
     += 
     1 
     return 
     { 
     "pages" 
     : 
     pages 
     + 
     1 
     if 
     total 
     else 
     pages 
     , 
     "items" 
     : 
     total 
     , 
     "cursor" 
     : 
     state 
     . 
     get 
     ( 
     "audit_cursor" 
     )} 
     # -------- Issues -------- 
     def 
      
     pull_issues 
     ( 
     state 
     : 
     dict 
     ) 
     - 
    > dict 
     : 
     cursor 
     = 
     state 
     . 
     get 
     ( 
     "issues_cursor" 
     ) 
     # stores 'starting_after' 
     pages 
     = 
     0 
     total 
     = 
     0 
     base 
     = 
     f 
     " 
     { 
     API_BASE 
     } 
     /rest/groups/ 
     { 
     GROUP_ID 
     } 
     /issues" 
     params 
     : 
     dict 
     [ 
     str 
     , 
     object 
     ] 
     = 
     { 
     "version" 
     : 
     ISSUES_API_VERSION 
     , 
     "limit" 
     : 
     ISSUES_LIMIT 
     } 
     if 
     cursor 
     : 
     params 
     [ 
     "starting_after" 
     ] 
     = 
     cursor 
     while 
     pages 
    < MAX_PAGES 
     : 
     url 
     = 
     f 
     " 
     { 
     base 
     } 
     ? 
     { 
     urllib 
     . 
     parse 
     . 
     urlencode 
     ( 
     params 
     , 
      
     doseq 
     = 
     True 
     ) 
     } 
     " 
     payload 
     = 
     _http_get 
     ( 
     url 
     ) 
     _write_page 
     ( 
     "issues" 
     , 
     payload 
     ) 
     data_items 
     = 
     payload 
     . 
     get 
     ( 
     "data" 
     ) 
     or 
     [] 
     if 
     isinstance 
     ( 
     data_items 
     , 
     list 
     ): 
     total 
     += 
     len 
     ( 
     data_items 
     ) 
     nxt 
     = 
     _next_href 
     ( 
     payload 
     . 
     get 
     ( 
     "links" 
     )) 
     if 
     not 
     nxt 
     : 
     break 
     q 
     = 
     parse_qs 
     ( 
     urlparse 
     ( 
     nxt 
     ) 
     . 
     query 
     ) 
     cur 
     = 
     ( 
     q 
     . 
     get 
     ( 
     "starting_after" 
     ) 
     or 
     [ 
     None 
     ])[ 
     0 
     ] 
     if 
     not 
     cur 
     : 
     break 
     params 
     = 
     { 
     "version" 
     : 
     ISSUES_API_VERSION 
     , 
     "limit" 
     : 
     ISSUES_LIMIT 
     , 
     "starting_after" 
     : 
     cur 
     } 
     state 
     [ 
     "issues_cursor" 
     ] 
     = 
     cur 
     pages 
     += 
     1 
     return 
     { 
     "pages" 
     : 
     pages 
     + 
     1 
     if 
     total 
     else 
     pages 
     , 
     "items" 
     : 
     total 
     , 
     "cursor" 
     : 
     state 
     . 
     get 
     ( 
     "issues_cursor" 
     )} 
     def 
      
     lambda_handler 
     ( 
     event 
     = 
     None 
     , 
     context 
     = 
     None 
     ): 
     state 
     = 
     _get_state 
     () 
     audit_res 
     = 
     pull_audit_logs 
     ( 
     state 
     ) 
     issues_res 
     = 
     pull_issues 
     ( 
     state 
     ) 
     _put_state 
     ( 
     state 
     ) 
     return 
     { 
     "ok" 
     : 
     True 
     , 
     "audit" 
     : 
     audit_res 
     , 
     "issues" 
     : 
     issues_res 
     } 
     if 
     __name__ 
     == 
     "__main__" 
     : 
     print 
     ( 
     lambda_handler 
     ()) 
     
    
  2. Go to Configuration > Environment variables > Edit > Add new environment variable.

  3. Enter the following environment variables, replacing with your values:

    Key Example
    S3_BUCKET snyk-group-logs
    S3_PREFIX snyk/group/
    STATE_KEY snyk/group/state.json
    SNYK_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxx
    GROUP_ID <group_uuid>
    API_BASE https://api.snyk.io
    SNYK_AUDIT_API_VERSION 2021-06-04
    SNYK_ISSUES_API_VERSION 2024-10-15
    AUDIT_PAGE_SIZE 100
    ISSUES_PAGE_LIMIT 200
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
  4. After the function is created, stay on its page (or open Lambda > Functions > <your-function> ).

  5. Select the Configurationtab.

  6. In the General configurationpanel click Edit.

  7. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: Your Lambda function snyk_group_audit_issues_to_s3 .
    • Name: snyk-group-audit-issues-1h .
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. In the AWS Console. go to IAM > Users > Add users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: secops-reader .
    • Access type: Access key — Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::snyk-group-logs/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::snyk-group-logs" 
      
     } 
      
     ] 
     } 
     
    
  7. Set the name to secops-reader-policy .

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV(these values are entered into the feed).

Configure a feed in Google SecOps to ingest the Snyk Group level audit and issues logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, Snyk Group Audit/Issues ).
  4. Select Amazon S3 V2as the Source type.
  5. Select Snyk Group level audit/issues logsas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://snyk-group-logs/snyk/group/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: snyk.group
    • Ingestion labels: Add if desired.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: