Collect Snyk group-level audit and issues logs
This guide explains how you can ingest Snyk group-level audit and issues logs to Google Security Operations using Amazon S3.
Before you begin
Make sure you have the following prerequisites:
- Google SecOps instance
- Privileged access to SnykGroup (API token with read access; Group ID)
- Privileged access to AWS(S3, IAM, Lambda, EventBridge)
Get Snyk Group ID and API token
- In SnykUI, go to Account settings > API tokenand generate the API token.
- Copy and save the token in a secure location to later use as
SNYK_TOKEN
. - Switch to your Groupand open Group settings.
- Copy and save the Group IDfrom the URL (
https://app.snyk.io/group/<GROUP_ID>/...
) to later use asGROUP_ID
. - Base API endpoint:
https://api.snyk.io
(override withAPI_BASE
if required). - Assign Group Adminrole to the user with the token. (The user must be able to view Group Audit Logsand Group Issues).
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucketfollowing this user guide: Creating a bucket
- Save bucket Nameand Regionfor future reference (for example,
snyk-group-logs
). - Create a user following this user guide: Creating an IAM user .
- Select the created User.
- Select the Security credentialstab.
- Click Create Access Keyin the Access Keyssection.
- Select Third-party serviceas the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
- Click Done.
- Select the Permissionstab.
- Click Add permissionsin the Permissions policiessection.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccesspolicy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- In the AWS console, go to IAM > Policies > Create policy > JSON tab.
-
Enter the following policy (includes write access for all objects under the bucket andread access to the state file your Lambda uses):
{ "Version" : "2012-10-17" , "Statement" : [ { "Sid" : "PutAllSnykGroupObjects" , "Effect" : "Allow" , "Action" : [ "s3:PutObject" , "s3:GetObject" ], "Resource" : "arn:aws:s3:::snyk-group-logs/*" } ] }
- Replace
snyk-group-logs
if you entered a different bucket name.
- Replace
-
Click Next > Create policy.
-
Go to IAM > Roles > Create role > AWS service > Lambda.
-
Attach the newly created policy.
-
Name the role
WriteSnykGroupToS3Role
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
- Provide the following configuration details:
Setting | Value |
---|---|
Name | snyk_group_audit_issues_to_s3
|
Runtime | Python 3.13 |
Architecture | x86_64 |
Execution role | WriteSnykGroupToS3Role
|
-
After the function is created, open the Codetab, delete the stub and enter the following code (
snyk_group_audit_issues_to_s3.py
):#!/usr/bin/env python3 # Lambda: Pull Snyk Group-level Audit Logs + Issues to S3 (no transform) import os import json import time import urllib.parse from urllib.request import Request , urlopen from urllib.parse import urlparse , parse_qs from urllib.error import HTTPError import boto3 API_BASE = os . environ . get ( "API_BASE" , "https://api.snyk.io" ) . rstrip ( "/" ) SNYK_TOKEN = os . environ [ "SNYK_TOKEN" ] . strip () GROUP_ID = os . environ [ "GROUP_ID" ] . strip () BUCKET = os . environ [ "S3_BUCKET" ] . strip () PREFIX = os . environ . get ( "S3_PREFIX" , "snyk/group/" ) . strip () STATE_KEY = os . environ . get ( "STATE_KEY" , "snyk/group/state.json" ) . strip () # Page sizes & limits AUDIT_SIZE = int ( os . environ . get ( "AUDIT_PAGE_SIZE" , "100" )) # audit uses 'size' (max 100) ISSUES_LIMIT = int ( os . environ . get ( "ISSUES_PAGE_LIMIT" , "200" )) # issues uses 'limit' MAX_PAGES = int ( os . environ . get ( "MAX_PAGES" , "20" )) # API versions (Snyk REST requires a 'version' param) AUDIT_API_VERSION = os . environ . get ( "SNYK_AUDIT_API_VERSION" , "2021-06-04" ) . strip () ISSUES_API_VERSION = os . environ . get ( "SNYK_ISSUES_API_VERSION" , "2024-10-15" ) . strip () # First-run lookback for audit to avoid huge backfills LOOKBACK_SECONDS = int ( os . environ . get ( "LOOKBACK_SECONDS" , "3600" )) HDRS = { "Authorization" : f "token { SNYK_TOKEN } " , "Accept" : "application/vnd.api+json" , } s3 = boto3 . client ( "s3" ) def _get_state () - > dict : try : obj = s3 . get_object ( Bucket = BUCKET , Key = STATE_KEY ) return json . loads ( obj [ "Body" ] . read () or b " {} " ) except Exception : return {} def _put_state ( state : dict ): s3 . put_object ( Bucket = BUCKET , Key = STATE_KEY , Body = json . dumps ( state , separators = ( "," , ":" )) . encode ( "utf-8" ), ContentType = "application/json" , ) def _iso ( ts : float ) - > str : return time . strftime ( "%Y-%m- %d T%H:%M:%SZ" , time . gmtime ( ts )) def _http_get ( url : str ) - > dict : req = Request ( url , method = "GET" , headers = HDRS ) try : with urlopen ( req , timeout = 60 ) as r : return json . loads ( r . read () . decode ( "utf-8" )) except HTTPError as e : if e . code in ( 429 , 500 , 502 , 503 , 504 ): delay = int ( e . headers . get ( "Retry-After" , "1" )) time . sleep ( max ( 1 , delay )) with urlopen ( req , timeout = 60 ) as r2 : return json . loads ( r2 . read () . decode ( "utf-8" )) raise def _write_page ( kind : str , payload : dict ) - > str : ts = time . gmtime () key = f " { PREFIX . rstrip ( '/' ) } / { time . strftime ( '%Y/%m/ %d /%H%M%S' , ts ) } -snyk- { kind } .json" s3 . put_object ( Bucket = BUCKET , Key = key , Body = json . dumps ( payload , separators = ( "," , ":" )) . encode ( "utf-8" ), ContentType = "application/json" , ) return key def _next_href ( links : dict | None ) - > str | None : if not links : return None nxt = links . get ( "next" ) if not nxt : return None if isinstance ( nxt , str ): return nxt if isinstance ( nxt , dict ): return nxt . get ( "href" ) return None # -------- Audit Logs -------- def pull_audit_logs ( state : dict ) - > dict : cursor = state . get ( "audit_cursor" ) pages = 0 total = 0 base = f " { API_BASE } /rest/groups/ { GROUP_ID } /audit_logs/search" params : dict [ str , object ] = { "version" : AUDIT_API_VERSION , "size" : AUDIT_SIZE } if cursor : params [ "cursor" ] = cursor else : now = time . time () params [ "from" ] = _iso ( now - LOOKBACK_SECONDS ) params [ "to" ] = _iso ( now ) while pages < MAX_PAGES : url = f " { base } ? { urllib . parse . urlencode ( params , doseq = True ) } " payload = _http_get ( url ) _write_page ( "audit" , payload ) data_items = ( payload . get ( "data" ) or {}) . get ( "items" ) or [] if isinstance ( data_items , list ): total += len ( data_items ) nxt = _next_href ( payload . get ( "links" )) if not nxt : break q = parse_qs ( urlparse ( nxt ) . query ) cur = ( q . get ( "cursor" ) or [ None ])[ 0 ] if not cur : break params = { "version" : AUDIT_API_VERSION , "size" : AUDIT_SIZE , "cursor" : cur } state [ "audit_cursor" ] = cur pages += 1 return { "pages" : pages + 1 if total else pages , "items" : total , "cursor" : state . get ( "audit_cursor" )} # -------- Issues -------- def pull_issues ( state : dict ) - > dict : cursor = state . get ( "issues_cursor" ) # stores 'starting_after' pages = 0 total = 0 base = f " { API_BASE } /rest/groups/ { GROUP_ID } /issues" params : dict [ str , object ] = { "version" : ISSUES_API_VERSION , "limit" : ISSUES_LIMIT } if cursor : params [ "starting_after" ] = cursor while pages < MAX_PAGES : url = f " { base } ? { urllib . parse . urlencode ( params , doseq = True ) } " payload = _http_get ( url ) _write_page ( "issues" , payload ) data_items = payload . get ( "data" ) or [] if isinstance ( data_items , list ): total += len ( data_items ) nxt = _next_href ( payload . get ( "links" )) if not nxt : break q = parse_qs ( urlparse ( nxt ) . query ) cur = ( q . get ( "starting_after" ) or [ None ])[ 0 ] if not cur : break params = { "version" : ISSUES_API_VERSION , "limit" : ISSUES_LIMIT , "starting_after" : cur } state [ "issues_cursor" ] = cur pages += 1 return { "pages" : pages + 1 if total else pages , "items" : total , "cursor" : state . get ( "issues_cursor" )} def lambda_handler ( event = None , context = None ): state = _get_state () audit_res = pull_audit_logs ( state ) issues_res = pull_issues ( state ) _put_state ( state ) return { "ok" : True , "audit" : audit_res , "issues" : issues_res } if __name__ == "__main__" : print ( lambda_handler ())
-
Go to Configuration > Environment variables > Edit > Add new environment variable.
-
Enter the following environment variables, replacing with your values:
Key Example S3_BUCKET
snyk-group-logs
S3_PREFIX
snyk/group/
STATE_KEY
snyk/group/state.json
SNYK_TOKEN
xxxxxxxx-xxxx-xxxx-xxxx-xxxx
GROUP_ID
<group_uuid>
API_BASE
https://api.snyk.io
SNYK_AUDIT_API_VERSION
2021-06-04
SNYK_ISSUES_API_VERSION
2024-10-15
AUDIT_PAGE_SIZE
100
ISSUES_PAGE_LIMIT
200
MAX_PAGES
20
LOOKBACK_SECONDS
3600
-
After the function is created, stay on its page (or open Lambda > Functions >
<your-function>
). -
Select the Configurationtab.
-
In the General configurationpanel click Edit.
-
Change Timeoutto 5 minutes (300 seconds)and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate(
1 hour
). - Target: Your Lambda function
snyk_group_audit_issues_to_s3
. - Name:
snyk-group-audit-issues-1h
.
- Recurring schedule: Rate(
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- In the AWS Console. go to IAM > Users > Add users.
- Click Add users.
- Provide the following configuration details:
- User:
secops-reader
. - Access type: Access key — Programmatic access.
- User:
- Click Create user.
- Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
-
In the JSON editor, enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::snyk-group-logs/*" }, { "Effect" : "Allow" , "Action" : [ "s3:ListBucket" ], "Resource" : "arn:aws:s3:::snyk-group-logs" } ] }
-
Set the name to
secops-reader-policy
. -
Go to Create policy > search/select > Next > Add permissions.
-
Go to Security credentials > Access keys > Create access key.
-
Download the CSV(these values are entered into the feed).
Configure a feed in Google SecOps to ingest the Snyk Group level audit and issues logs
- Go to SIEM Settings > Feeds.
- Click + Add New Feed.
- In the Feed namefield, enter a name for the feed (for example,
Snyk Group Audit/Issues
). - Select Amazon S3 V2as the Source type.
- Select Snyk Group level audit/issues logsas the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://snyk-group-logs/snyk/group/
- Source deletion options: Select deletion option according to your preference.
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace:
snyk.group
- Ingestion labels: Add if desired.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalizescreen, and then click Submit.
Need more help? Get answers from Community members and Google SecOps professionals.