Collect Harness IO audit logs
This document explains how to ingest Harness IO audit logs to Google Security Operations using Amazon S3.
Before you begin
Make sure you have the following prerequisites:
- A Google SecOps instance
- Privileged access to Harnesswith permissions to:
- Create API keys
- Access audit logs
- View account settings
- Privileged access to AWS(S3, IAM, Lambda, EventBridge).
Collect Harness API credentials
Create API key in Harness
- Sign in to the Harness Platform.
- Click your User Profile.
- Go to My API Keys.
- Click + API Key.
- Provide the following configuration details:
- Name: Enter a descriptive name (for example,
Google SecOps Integration). - Description: Optional description.
- Name: Enter a descriptive name (for example,
- Click Save.
- Click + Tokento create a new token.
- Provide the following configuration details:
- Name: Enter
Chronicle Feed Token. - Set Expiration: Select an appropriate expiration time or No Expiration(for production use).
- Name: Enter
- Click Generate Token.
-
Copy and save the token value securely. This token will be used as the
x-api-keyheader value.
Get Harness Account ID
- In the Harness Platform, note the Account IDfrom the URL.
- Example URL:
https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/... - The
YOUR_ACCOUNT_IDportion is your Account Identifier.
- Example URL:
- Alternatively, go to Account Settings > Overviewto view your Account Identifier.
-
Copy and save the Account ID for use in the Lambda function.
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucketfollowing this user guide: Creating a bucket
- Save bucket Nameand Regionfor future reference (for example,
harness-io-logs). - Create a Userfollowing this user guide: Creating an IAM user .
- Select the created User.
- Select the Security credentialstab.
- Click Create Access Keyin section Access Keys.
- Select Third-party serviceas Use case.
- Click Next.
- Optional: Add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor future reference.
- Click Done.
- Select the Permissionstab.
- Click Add permissionsin section Permissions policies.
- Select Add permissions.
- Select Attach policies directly.
- Search for AmazonS3FullAccesspolicy.
- Select the policy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for Lambda S3 uploads
- In the AWS console, go to IAM > Policies > Create policy > JSON tab.
-
Copy and paste the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Sid" : "AllowPutHarnessObjects" , "Effect" : "Allow" , "Action" : "s3:PutObject" , "Resource" : "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Sid" : "AllowGetStateObject" , "Effect" : "Allow" , "Action" : "s3:GetObject" , "Resource" : "arn:aws:s3:::harness-io-logs/harness/audit/state.json" } ] }- Replace
harness-io-logsif you entered a different bucket name.
- Replace
-
Click Next.
-
Name the policy
HarnessToS3Policyand click Create policy. -
Go to IAM > Roles > Create role.
-
Select AWS serviceas trusted entity type.
-
Select Lambdaas the use case.
-
Click Next.
-
Search for and select the following policies:
-
HarnessToS3Policy(the policy you just created) -
AWSLambdaBasicExecutionRole(for CloudWatch Logs)
-
-
Click Next.
-
Name the role
HarnessAuditLambdaRoleand click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
-
Provide the following configuration details:
Setting Value Name harness-audit-to-s3Runtime Python 3.13 Architecture x86_64 Execution role HarnessAuditLambdaRole -
Click Create function.
-
After the function is created, open the Codetab.
-
Delete the default stub code and enter the following Lambda function code:
-
Lambda function code (
harness_audit_to_s3.py)#!/usr/bin/env python3 """ Harness.io Audit Logs to S3 Lambda Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion. """ import os import json import time import uuid import logging import urllib.parse from datetime import datetime , timedelta , timezone from urllib.request import Request , urlopen from urllib.error import HTTPError , URLError import boto3 # Configuration from Environment Variables API_BASE = os . environ . get ( "HARNESS_API_BASE" , "https://app.harness.io" ) . rstrip ( "/" ) ACCOUNT_ID = os . environ [ "HARNESS_ACCOUNT_ID" ] API_KEY = os . environ [ "HARNESS_API_KEY" ] BUCKET = os . environ [ "S3_BUCKET" ] PREFIX = os . environ . get ( "S3_PREFIX" , "harness/audit" ) . strip ( "/" ) STATE_KEY = os . environ . get ( "STATE_KEY" , "harness/audit/state.json" ) PAGE_SIZE = min ( int ( os . environ . get ( "PAGE_SIZE" , "50" )), 100 ) START_MINUTES_BACK = int ( os . environ . get ( "START_MINUTES_BACK" , "60" )) # Optional filters (NEW) FILTER_MODULES = os . environ . get ( "FILTER_MODULES" , "" ) . split ( "," ) if os . environ . get ( "FILTER_MODULES" ) else None FILTER_ACTIONS = os . environ . get ( "FILTER_ACTIONS" , "" ) . split ( "," ) if os . environ . get ( "FILTER_ACTIONS" ) else None STATIC_FILTER = os . environ . get ( "STATIC_FILTER" ) # e.g., "EXCLUDE_LOGIN_EVENTS" MAX_RETRIES = int ( os . environ . get ( "MAX_RETRIES" , "3" )) # AWS clients s3 = boto3 . client ( "s3" ) # HTTP headers for Harness API HDRS = { "x-api-key" : API_KEY , "Content-Type" : "application/json" , "Accept" : "application/json" , } # Logging configuration logger = logging . getLogger () logger . setLevel ( logging . INFO ) # ============================================ # State Management Functions # ============================================ def _read_state (): """Read checkpoint state from S3.""" try : obj = s3 . get_object ( Bucket = BUCKET , Key = STATE_KEY ) state = json . loads ( obj [ "Body" ] . read ()) since_ms = state . get ( "since" ) page_token = state . get ( "pageToken" ) logger . info ( f "State loaded: since= { since_ms } , pageToken= { page_token } " ) return since_ms , page_token except s3 . exceptions . NoSuchKey : logger . info ( "No state file found, starting fresh collection" ) start_time = datetime . now ( timezone . utc ) - timedelta ( minutes = START_MINUTES_BACK ) since_ms = int ( start_time . timestamp () * 1000 ) logger . info ( f "Initial since timestamp: { since_ms } ( { start_time . isoformat () } )" ) return since_ms , None except Exception as e : logger . error ( f "Error reading state: { e } " ) raise def _write_state ( since_ms : int , page_token : str = None ): """Write checkpoint state to S3.""" state = { "since" : since_ms , "pageToken" : page_token , "lastRun" : int ( time . time () * 1000 ), "lastRunISO" : datetime . now ( timezone . utc ) . isoformat () } try : s3 . put_object ( Bucket = BUCKET , Key = STATE_KEY , Body = json . dumps ( state , indent = 2 ) . encode (), ContentType = "application/json" ) logger . info ( f "State saved: since= { since_ms } , pageToken= { page_token } " ) except Exception as e : logger . error ( f "Error writing state: { e } " ) raise # ============================================ # Harness API Functions # ============================================ def _fetch_harness_audits ( since_ms : int , page_token : str = None , retry_count : int = 0 ): """ Fetch audit logs from Harness API with retry logic. API Endpoint: POST /audit/api/audits/listV2 Documentation: https://apidocs.harness.io/audit/getauditeventlistv2 """ try : # Build URL with query parameters url = ( f " { API_BASE } /audit/api/audits/listV2" f "?accountIdentifier= { urllib . parse . quote ( ACCOUNT_ID ) } " f "&pageSize= { PAGE_SIZE } " ) if page_token : url += f "&pageToken= { urllib . parse . quote ( page_token ) } " logger . info ( f "Fetching from: { url [: 100 ] } ..." ) # Build request body with time filter and optional filters body_data = { "startTime" : since_ms , "endTime" : int ( time . time () * 1000 ), "filterType" : "Audit" } if FILTER_MODULES : body_data [ "modules" ] = [ m . strip () for m in FILTER_MODULES if m . strip ()] logger . info ( f "Applying module filter: { body_data [ 'modules' ] } " ) if FILTER_ACTIONS : body_data [ "actions" ] = [ a . strip () for a in FILTER_ACTIONS if a . strip ()] logger . info ( f "Applying action filter: { body_data [ 'actions' ] } " ) if STATIC_FILTER : body_data [ "staticFilter" ] = STATIC_FILTER logger . info ( f "Applying static filter: { STATIC_FILTER } " ) logger . debug ( f "Request body: { json . dumps ( body_data ) } " ) # Make POST request req = Request ( url , data = json . dumps ( body_data ) . encode ( 'utf-8' ), headers = HDRS , method = "POST" ) resp = urlopen ( req , timeout = 30 ) resp_text = resp . read () . decode ( 'utf-8' ) resp_data = json . loads ( resp_text ) if "status" not in resp_data : logger . warning ( f "Response missing 'status' field: { resp_text [: 200 ] } " ) # Check response status if resp_data . get ( "status" ) != "SUCCESS" : error_msg = resp_data . get ( "message" , "Unknown error" ) raise Exception ( f "API returned status: { resp_data . get ( 'status' ) } - { error_msg } " ) # Extract data from response structure data_obj = resp_data . get ( "data" , {}) if not data_obj : logger . warning ( "Response 'data' object is empty or missing" ) events = data_obj . get ( "content" , []) has_next = data_obj . get ( "hasNext" , False ) next_token = data_obj . get ( "pageToken" ) logger . info ( f "API response: { len ( events ) } events, hasNext= { has_next } , pageToken= { next_token } " ) if not events and data_obj : logger . info ( f "Empty events but data present. Data keys: { list ( data_obj . keys ()) } " ) return { "events" : events , "hasNext" : has_next , "pageToken" : next_token } except HTTPError as e : error_body = e . read () . decode () if hasattr ( e , 'read' ) else '' if e . code == 401 : logger . error ( "Authentication failed: Invalid API key" ) raise Exception ( "Invalid Harness API key. Check HARNESS_API_KEY environment variable." ) elif e . code == 403 : logger . error ( "Authorization failed: Insufficient permissions" ) raise Exception ( "API key lacks required audit:read permissions" ) elif e . code == 429 : retry_after = int ( e . headers . get ( "Retry-After" , "60" )) logger . warning ( f "Rate limit exceeded. Retry after { retry_after } seconds (attempt { retry_count + 1 } / { MAX_RETRIES } )" ) if retry_count < MAX_RETRIES : logger . info ( f "Waiting { retry_after } seconds before retry..." ) time . sleep ( retry_after ) logger . info ( f "Retrying request (attempt { retry_count + 2 } / { MAX_RETRIES } )" ) return _fetch_harness_audits ( since_ms , page_token , retry_count + 1 ) else : raise Exception ( f "Max retries ( { MAX_RETRIES } ) exceeded for rate limiting" ) elif e . code == 400 : logger . error ( f "Bad request: { error_body } " ) raise Exception ( f "Invalid request parameters: { error_body } " ) else : logger . error ( f "HTTP { e . code } : { e . reason } - { error_body } " ) raise Exception ( f "Harness API error { e . code } : { e . reason } " ) except URLError as e : logger . error ( f "Network error: { e . reason } " ) raise Exception ( f "Failed to connect to Harness API: { e . reason } " ) except json . JSONDecodeError as e : logger . error ( f "Invalid JSON response: { e } " ) logger . error ( f "Response text (first 500 chars): { resp_text [: 500 ] if 'resp_text' in locals () else 'N/A' } " ) raise Exception ( "Harness API returned invalid JSON" ) except Exception as e : logger . error ( f "Unexpected error in _fetch_harness_audits: { e } " , exc_info = True ) raise # ============================================ # S3 Upload Functions # ============================================ def _upload_to_s3 ( events : list ) - > str : """ Upload audit events to S3 in JSONL format. Each line is a complete JSON object (one event per line). """ if not events : logger . info ( "No events to upload" ) return None try : # Create JSONL content (one JSON object per line) jsonl_lines = [ json . dumps ( event ) for event in events ] jsonl_content = " \n " . join ( jsonl_lines ) # Generate S3 key with timestamp and UUID timestamp = datetime . now ( timezone . utc ) key = ( f " { PREFIX } /" f " { timestamp : %Y/%m/%d } /" f "harness-audit- { timestamp : %Y%m%d-%H%M%S } - { uuid . uuid4 () } .jsonl" ) # Upload to S3 s3 . put_object ( Bucket = BUCKET , Key = key , Body = jsonl_content . encode ( 'utf-8' ), ContentType = "application/x-ndjson" , Metadata = { "event-count" : str ( len ( events )), "source" : "harness-audit-lambda" , "collection-time" : timestamp . isoformat () } ) logger . info ( f "Uploaded { len ( events ) } events to s3:// { BUCKET } / { key } " ) return key except Exception as e : logger . error ( f "Error uploading to S3: { e } " , exc_info = True ) raise # ============================================ # Main Orchestration Function # ============================================ def fetch_and_store (): """ Main function to fetch audit logs from Harness and store in S3. Handles pagination and state management. """ logger . info ( "=== Harness Audit Collection Started ===" ) logger . info ( f "Configuration: API_BASE= { API_BASE } , ACCOUNT_ID= { ACCOUNT_ID [: 8 ] } ..., PAGE_SIZE= { PAGE_SIZE } " ) if FILTER_MODULES : logger . info ( f "Module filter enabled: { FILTER_MODULES } " ) if FILTER_ACTIONS : logger . info ( f "Action filter enabled: { FILTER_ACTIONS } " ) if STATIC_FILTER : logger . info ( f "Static filter enabled: { STATIC_FILTER } " ) try : # Step 1: Read checkpoint state since_ms , page_token = _read_state () if page_token : logger . info ( f "Resuming pagination from saved pageToken" ) else : since_dt = datetime . fromtimestamp ( since_ms / 1000 , tz = timezone . utc ) logger . info ( f "Starting new collection from: { since_dt . isoformat () } " ) # Step 2: Collect all events with pagination all_events = [] current_page_token = page_token page_count = 0 max_pages = 100 # Safety limit has_next = True while has_next and page_count < max_pages : page_count += 1 logger . info ( f "--- Fetching page { page_count } ---" ) # Fetch one page of results result = _fetch_harness_audits ( since_ms , current_page_token ) # Extract events events = result . get ( "events" , []) all_events . extend ( events ) logger . info ( f "Page { page_count } : { len ( events ) } events (total: { len ( all_events ) } )" ) # Check pagination status has_next = result . get ( "hasNext" , False ) current_page_token = result . get ( "pageToken" ) if not has_next : logger . info ( "Pagination complete (hasNext=False)" ) break if not current_page_token : logger . warning ( "hasNext=True but no pageToken, stopping pagination" ) break # Small delay between pages to avoid rate limiting time . sleep ( 0.5 ) if page_count > = max_pages : logger . warning ( f "Reached max pages limit ( { max_pages } ), stopping" ) # Step 3: Upload collected events to S3 if all_events : s3_key = _upload_to_s3 ( all_events ) logger . info ( f "Successfully uploaded { len ( all_events ) } total events" ) else : logger . info ( "No new events to upload" ) s3_key = None # Step 4: Update checkpoint state if not has_next : # Pagination complete - update since to current time for next run new_since = int ( time . time () * 1000 ) _write_state ( new_since , None ) logger . info ( f "Pagination complete, state updated with new since= { new_since } " ) else : # Pagination incomplete - save pageToken for continuation _write_state ( since_ms , current_page_token ) logger . info ( f "Pagination incomplete, saved pageToken for next run" ) # Step 5: Return result result = { "statusCode" : 200 , "message" : "Success" , "eventsCollected" : len ( all_events ), "pagesProcessed" : page_count , "paginationComplete" : not has_next , "s3Key" : s3_key , "filters" : { "modules" : FILTER_MODULES , "actions" : FILTER_ACTIONS , "staticFilter" : STATIC_FILTER } } logger . info ( f "Collection completed: { json . dumps ( result ) } " ) return result except Exception as e : logger . error ( f "Collection failed: { e } " , exc_info = True ) result = { "statusCode" : 500 , "message" : "Error" , "error" : str ( e ), "errorType" : type ( e ) . __name__ } return result finally : logger . info ( "=== Harness Audit Collection Finished ===" ) # ============================================ # Lambda Handler # ============================================ def lambda_handler ( event , context ): """AWS Lambda handler function.""" return fetch_and_store () # ============================================ # Local Testing # ============================================ if __name__ == "__main__" : # For local testing result = lambda_handler ( None , None ) print ( json . dumps ( result , indent = 2 ))
-
-
Click Deployto save the function code.
Configure Lambda environment variables
- In the Lambda function page, select the Configurationtab.
- Click Environment variablesin the left sidebar.
- Click Edit.
-
Click Add environment variablefor each of the following:
Required environment variables:
Key Value Description HARNESS_ACCOUNT_IDYour Harness Account ID Account identifier from Harness HARNESS_API_KEYYour API key token Token with audit:read permissions S3_BUCKETharness-io-logsS3 bucket name S3_PREFIXharness/auditPrefix for S3 objects STATE_KEYharness/audit/state.jsonState file path in S3 Optional environment variables:
Key Default Value Description HARNESS_API_BASEhttps://app.harness.ioHarness API base URL PAGE_SIZE50Events per page (max 100) START_MINUTES_BACK60Initial lookback period in minutes FILTER_MODULESNone Comma-separated modules (e.g., CD,CI,CE)FILTER_ACTIONSNone Comma-separated actions (e.g., CREATE,UPDATE,DELETE)STATIC_FILTERNone Pre-defined filter: EXCLUDE_LOGIN_EVENTSorEXCLUDE_SYSTEM_EVENTSMAX_RETRIES3Max retry attempts for rate limiting -
Click Save.
Configure Lambda timeout and memory
- In the Lambda function page, select the Configurationtab.
- Click General configurationin the left sidebar.
- Click Edit.
- Provide the following configuration details:
- Memory:
256 MB(recommended) - Timeout:
5 min 0 sec(300 seconds)
- Memory:
- Click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Schedule name: Enter
harness-audit-hourly. - Description: Optional description.
- Schedule name: Enter
- Click Next.
- Under Schedule pattern, select Recurring schedule.
- Select Rate-based schedule.
- Provide the following configuration details:
- Rate expression: Enter
1 hour.
- Rate expression: Enter
- Click Next.
- Under Target, provide the following configuration details:
- Target API: Select AWS Lambda Invoke.
- Lambda function: Select your function
harness-audit-to-s3.
- Click Next.
- Review the schedule configuration.
- Click Create schedule.
Create read-only IAM user for Google SecOps
This IAM user allows Google SecOps to read logs from the S3 bucket.
- Go to AWS Console > IAM > Users > Create user.
- Provide the following configuration details:
- User name: Enter
chronicle-s3-reader.
- User name: Enter
- Click Next.
- Select Attach policies directly.
- Click Create policy.
- Select the JSONtab.
-
Paste the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::harness-io-logs/harness/audit/*" }, { "Effect" : "Allow" , "Action" : [ "s3:ListBucket" ], "Resource" : "arn:aws:s3:::harness-io-logs" , "Condition" : { "StringLike" : { "s3:prefix" : "harness/audit/*" } } } ] } -
Click Next.
-
Name the policy
ChronicleHarnessS3ReadPolicy. -
Click Create policy.
-
Return to the user creation tab and refresh the policies list.
-
Search for and select
ChronicleHarnessS3ReadPolicy. -
Click Next.
-
Review and click Create user.
Create access keys for the reader user
- In the IAM Userspage, select the
chronicle-s3-readeruser. - Select the Security credentialstab.
- Click Create access key.
- Select Third-party serviceas the use case.
- Click Next.
- Optional: Add description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Key ID and Secret Access Key.
- Click Done.
Configure a feed in Google SecOps to ingest Harness IO logs
- Go to SIEM Settings > Feeds.
- Click Add new.
- On the next page, click Configure a single feed.
- In the Feed namefield, enter a name for the feed (for example,
Harness Audit Logs). - Select Amazon S3 V2as the Source type.
- Select Harness IOas the Log type.
- Click Next.
-
Specify values for the following input parameters:
- S3 URI: Enter the S3 bucket URI with the prefix path:
s3://harness-io-logs/harness/audit/ -
Source deletion option: Select the deletion option according to your preference:
- Never: Never deletes any files after transfers (recommended initially).
- On success: Deletes all files and empty directories after successful transfer.
-
Maximum File Age: Include files modified in the last number of days. Default is 180 days.
-
Access Key ID: Enter the Access Key ID from the
chronicle-s3-readeruser. -
Secret Access Key: Enter the Secret Access Key from the
chronicle-s3-readeruser. -
Asset namespace: The asset namespace . Enter
harness.audit. -
Ingestion labels: Optional labels to be applied to events from this feed.
- S3 URI: Enter the S3 bucket URI with the prefix path:
-
Click Next.
-
Review your new feed configuration in the Finalizescreen, and then click Submit.
Need more help? Get answers from Community members and Google SecOps professionals.

