Collect Harness IO audit logs

Supported in:

This document explains how to ingest Harness IO audit logs to Google Security Operations using Amazon S3.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance
  • Privileged access to Harnesswith permissions to:
    • Create API keys
    • Access audit logs
    • View account settings
  • Privileged access to AWS(S3, IAM, Lambda, EventBridge).

Collect Harness API credentials

Create API key in Harness

  1. Sign in to the Harness Platform.
  2. Click your User Profile.
  3. Go to My API Keys.
  4. Click + API Key.
  5. Provide the following configuration details:
    • Name: Enter a descriptive name (for example, Google SecOps Integration ).
    • Description: Optional description.
  6. Click Save.
  7. Click + Tokento create a new token.
  8. Provide the following configuration details:
    • Name: Enter Chronicle Feed Token .
    • Set Expiration: Select an appropriate expiration time or No Expiration(for production use).
  9. Click Generate Token.
  10. Copy and save the token value securely. This token will be used as the x-api-key header value.

  1. In the Harness Platform, note the Account IDfrom the URL.
    • Example URL: https://app.harness.io/ng/account/YOUR_ACCOUNT_ID/...
    • The YOUR_ACCOUNT_ID portion is your Account Identifier.
  2. Alternatively, go to Account Settings > Overviewto view your Account Identifier.
  3. Copy and save the Account ID for use in the Lambda function.

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, harness-io-logs ).
  3. Create a Userfollowing this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin section Access Keys.
  7. Select Third-party serviceas Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for Lambda S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSON tab.
  2. Copy and paste the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutHarnessObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::harness-io-logs/harness/audit/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::harness-io-logs/harness/audit/state.json" 
      
     } 
      
     ] 
     } 
     
    
    • Replace harness-io-logs if you entered a different bucket name.
  3. Click Next.

  4. Name the policy HarnessToS3Policy and click Create policy.

  5. Go to IAM > Roles > Create role.

  6. Select AWS serviceas trusted entity type.

  7. Select Lambdaas the use case.

  8. Click Next.

  9. Search for and select the following policies:

    • HarnessToS3Policy (the policy you just created)
    • AWSLambdaBasicExecutionRole (for CloudWatch Logs)
  10. Click Next.

  11. Name the role HarnessAuditLambdaRole and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name harness-audit-to-s3
    Runtime Python 3.13
    Architecture x86_64
    Execution role HarnessAuditLambdaRole
  4. Click Create function.

  5. After the function is created, open the Codetab.

  6. Delete the default stub code and enter the following Lambda function code:

    • Lambda function code ( harness_audit_to_s3.py )

        #!/usr/bin/env python3 
       """ 
       Harness.io Audit Logs to S3 Lambda 
       Fetches audit logs from Harness API and writes to S3 for Chronicle ingestion. 
       """ 
       import 
        
       os 
       import 
        
       json 
       import 
        
       time 
       import 
        
       uuid 
       import 
        
       logging 
       import 
        
       urllib.parse 
       from 
        
       datetime 
        
       import 
       datetime 
       , 
       timedelta 
       , 
       timezone 
       from 
        
       urllib.request 
        
       import 
       Request 
       , 
       urlopen 
       from 
        
       urllib.error 
        
       import 
       HTTPError 
       , 
       URLError 
       import 
        
       boto3 
       # Configuration from Environment Variables 
       API_BASE 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "HARNESS_API_BASE" 
       , 
       "https://app.harness.io" 
       ) 
       . 
       rstrip 
       ( 
       "/" 
       ) 
       ACCOUNT_ID 
       = 
       os 
       . 
       environ 
       [ 
       "HARNESS_ACCOUNT_ID" 
       ] 
       API_KEY 
       = 
       os 
       . 
       environ 
       [ 
       "HARNESS_API_KEY" 
       ] 
       BUCKET 
       = 
       os 
       . 
       environ 
       [ 
       "S3_BUCKET" 
       ] 
       PREFIX 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "S3_PREFIX" 
       , 
       "harness/audit" 
       ) 
       . 
       strip 
       ( 
       "/" 
       ) 
       STATE_KEY 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "STATE_KEY" 
       , 
       "harness/audit/state.json" 
       ) 
       PAGE_SIZE 
       = 
       min 
       ( 
       int 
       ( 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "PAGE_SIZE" 
       , 
       "50" 
       )), 
       100 
       ) 
       START_MINUTES_BACK 
       = 
       int 
       ( 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "START_MINUTES_BACK" 
       , 
       "60" 
       )) 
       # Optional filters (NEW) 
       FILTER_MODULES 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "FILTER_MODULES" 
       , 
       "" 
       ) 
       . 
       split 
       ( 
       "," 
       ) 
       if 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "FILTER_MODULES" 
       ) 
       else 
       None 
       FILTER_ACTIONS 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "FILTER_ACTIONS" 
       , 
       "" 
       ) 
       . 
       split 
       ( 
       "," 
       ) 
       if 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "FILTER_ACTIONS" 
       ) 
       else 
       None 
       STATIC_FILTER 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "STATIC_FILTER" 
       ) 
       # e.g., "EXCLUDE_LOGIN_EVENTS" 
       MAX_RETRIES 
       = 
       int 
       ( 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "MAX_RETRIES" 
       , 
       "3" 
       )) 
       # AWS clients 
       s3 
       = 
       boto3 
       . 
       client 
       ( 
       "s3" 
       ) 
       # HTTP headers for Harness API 
       HDRS 
       = 
       { 
       "x-api-key" 
       : 
       API_KEY 
       , 
       "Content-Type" 
       : 
       "application/json" 
       , 
       "Accept" 
       : 
       "application/json" 
       , 
       } 
       # Logging configuration 
       logger 
       = 
       logging 
       . 
       getLogger 
       () 
       logger 
       . 
       setLevel 
       ( 
       logging 
       . 
       INFO 
       ) 
       # ============================================ 
       # State Management Functions 
       # ============================================ 
       def 
        
       _read_state 
       (): 
        
       """Read checkpoint state from S3.""" 
       try 
       : 
       obj 
       = 
       s3 
       . 
       get_object 
       ( 
       Bucket 
       = 
       BUCKET 
       , 
       Key 
       = 
       STATE_KEY 
       ) 
       state 
       = 
       json 
       . 
       loads 
       ( 
       obj 
       [ 
       "Body" 
       ] 
       . 
       read 
       ()) 
       since_ms 
       = 
       state 
       . 
       get 
       ( 
       "since" 
       ) 
       page_token 
       = 
       state 
       . 
       get 
       ( 
       "pageToken" 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "State loaded: since= 
       { 
       since_ms 
       } 
       , pageToken= 
       { 
       page_token 
       } 
       " 
       ) 
       return 
       since_ms 
       , 
       page_token 
       except 
       s3 
       . 
       exceptions 
       . 
       NoSuchKey 
       : 
       logger 
       . 
       info 
       ( 
       "No state file found, starting fresh collection" 
       ) 
       start_time 
       = 
       datetime 
       . 
       now 
       ( 
       timezone 
       . 
       utc 
       ) 
       - 
       timedelta 
       ( 
       minutes 
       = 
       START_MINUTES_BACK 
       ) 
       since_ms 
       = 
       int 
       ( 
       start_time 
       . 
       timestamp 
       () 
       * 
       1000 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Initial since timestamp: 
       { 
       since_ms 
       } 
       ( 
       { 
       start_time 
       . 
       isoformat 
       () 
       } 
       )" 
       ) 
       return 
       since_ms 
       , 
       None 
       except 
       Exception 
       as 
       e 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Error reading state: 
       { 
       e 
       } 
       " 
       ) 
       raise 
       def 
        
       _write_state 
       ( 
       since_ms 
       : 
       int 
       , 
       page_token 
       : 
       str 
       = 
       None 
       ): 
        
       """Write checkpoint state to S3.""" 
       state 
       = 
       { 
       "since" 
       : 
       since_ms 
       , 
       "pageToken" 
       : 
       page_token 
       , 
       "lastRun" 
       : 
       int 
       ( 
       time 
       . 
       time 
       () 
       * 
       1000 
       ), 
       "lastRunISO" 
       : 
       datetime 
       . 
       now 
       ( 
       timezone 
       . 
       utc 
       ) 
       . 
       isoformat 
       () 
       } 
       try 
       : 
       s3 
       . 
       put_object 
       ( 
       Bucket 
       = 
       BUCKET 
       , 
       Key 
       = 
       STATE_KEY 
       , 
       Body 
       = 
       json 
       . 
       dumps 
       ( 
       state 
       , 
       indent 
       = 
       2 
       ) 
       . 
       encode 
       (), 
       ContentType 
       = 
       "application/json" 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "State saved: since= 
       { 
       since_ms 
       } 
       , pageToken= 
       { 
       page_token 
       } 
       " 
       ) 
       except 
       Exception 
       as 
       e 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Error writing state: 
       { 
       e 
       } 
       " 
       ) 
       raise 
       # ============================================ 
       # Harness API Functions 
       # ============================================ 
       def 
        
       _fetch_harness_audits 
       ( 
       since_ms 
       : 
       int 
       , 
       page_token 
       : 
       str 
       = 
       None 
       , 
       retry_count 
       : 
       int 
       = 
       0 
       ): 
        
       """ 
       Fetch audit logs from Harness API with retry logic. 
       API Endpoint: POST /audit/api/audits/listV2 
       Documentation: https://apidocs.harness.io/audit/getauditeventlistv2 
       """ 
       try 
       : 
       # Build URL with query parameters 
       url 
       = 
       ( 
       f 
       " 
       { 
       API_BASE 
       } 
       /audit/api/audits/listV2" 
       f 
       "?accountIdentifier= 
       { 
       urllib 
       . 
       parse 
       . 
       quote 
       ( 
       ACCOUNT_ID 
       ) 
       } 
       " 
       f 
       "&pageSize= 
       { 
       PAGE_SIZE 
       } 
       " 
       ) 
       if 
       page_token 
       : 
       url 
       += 
       f 
       "&pageToken= 
       { 
       urllib 
       . 
       parse 
       . 
       quote 
       ( 
       page_token 
       ) 
       } 
       " 
       logger 
       . 
       info 
       ( 
       f 
       "Fetching from: 
       { 
       url 
       [: 
       100 
       ] 
       } 
       ..." 
       ) 
       # Build request body with time filter and optional filters 
       body_data 
       = 
       { 
       "startTime" 
       : 
       since_ms 
       , 
       "endTime" 
       : 
       int 
       ( 
       time 
       . 
       time 
       () 
       * 
       1000 
       ), 
       "filterType" 
       : 
       "Audit" 
       } 
       if 
       FILTER_MODULES 
       : 
       body_data 
       [ 
       "modules" 
       ] 
       = 
       [ 
       m 
       . 
       strip 
       () 
       for 
       m 
       in 
       FILTER_MODULES 
       if 
       m 
       . 
       strip 
       ()] 
       logger 
       . 
       info 
       ( 
       f 
       "Applying module filter: 
       { 
       body_data 
       [ 
       'modules' 
       ] 
       } 
       " 
       ) 
       if 
       FILTER_ACTIONS 
       : 
       body_data 
       [ 
       "actions" 
       ] 
       = 
       [ 
       a 
       . 
       strip 
       () 
       for 
       a 
       in 
       FILTER_ACTIONS 
       if 
       a 
       . 
       strip 
       ()] 
       logger 
       . 
       info 
       ( 
       f 
       "Applying action filter: 
       { 
       body_data 
       [ 
       'actions' 
       ] 
       } 
       " 
       ) 
       if 
       STATIC_FILTER 
       : 
       body_data 
       [ 
       "staticFilter" 
       ] 
       = 
       STATIC_FILTER 
       logger 
       . 
       info 
       ( 
       f 
       "Applying static filter: 
       { 
       STATIC_FILTER 
       } 
       " 
       ) 
       logger 
       . 
       debug 
       ( 
       f 
       "Request body: 
       { 
       json 
       . 
       dumps 
       ( 
       body_data 
       ) 
       } 
       " 
       ) 
       # Make POST request 
       req 
       = 
       Request 
       ( 
       url 
       , 
       data 
       = 
       json 
       . 
       dumps 
       ( 
       body_data 
       ) 
       . 
       encode 
       ( 
       'utf-8' 
       ), 
       headers 
       = 
       HDRS 
       , 
       method 
       = 
       "POST" 
       ) 
       resp 
       = 
       urlopen 
       ( 
       req 
       , 
       timeout 
       = 
       30 
       ) 
       resp_text 
       = 
       resp 
       . 
       read 
       () 
       . 
       decode 
       ( 
       'utf-8' 
       ) 
       resp_data 
       = 
       json 
       . 
       loads 
       ( 
       resp_text 
       ) 
       if 
       "status" 
       not 
       in 
       resp_data 
       : 
       logger 
       . 
       warning 
       ( 
       f 
       "Response missing 'status' field: 
       { 
       resp_text 
       [: 
       200 
       ] 
       } 
       " 
       ) 
       # Check response status 
       if 
       resp_data 
       . 
       get 
       ( 
       "status" 
       ) 
       != 
       "SUCCESS" 
       : 
       error_msg 
       = 
       resp_data 
       . 
       get 
       ( 
       "message" 
       , 
       "Unknown error" 
       ) 
       raise 
       Exception 
       ( 
       f 
       "API returned status: 
       { 
       resp_data 
       . 
       get 
       ( 
       'status' 
       ) 
       } 
       - 
       { 
       error_msg 
       } 
       " 
       ) 
       # Extract data from response structure 
       data_obj 
       = 
       resp_data 
       . 
       get 
       ( 
       "data" 
       , 
       {}) 
       if 
       not 
       data_obj 
       : 
       logger 
       . 
       warning 
       ( 
       "Response 'data' object is empty or missing" 
       ) 
       events 
       = 
       data_obj 
       . 
       get 
       ( 
       "content" 
       , 
       []) 
       has_next 
       = 
       data_obj 
       . 
       get 
       ( 
       "hasNext" 
       , 
       False 
       ) 
       next_token 
       = 
       data_obj 
       . 
       get 
       ( 
       "pageToken" 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "API response: 
       { 
       len 
       ( 
       events 
       ) 
       } 
       events, hasNext= 
       { 
       has_next 
       } 
       , pageToken= 
       { 
       next_token 
       } 
       " 
       ) 
       if 
       not 
       events 
       and 
       data_obj 
       : 
       logger 
       . 
       info 
       ( 
       f 
       "Empty events but data present. Data keys: 
       { 
       list 
       ( 
       data_obj 
       . 
       keys 
       ()) 
       } 
       " 
       ) 
       return 
       { 
       "events" 
       : 
       events 
       , 
       "hasNext" 
       : 
       has_next 
       , 
       "pageToken" 
       : 
       next_token 
       } 
       except 
       HTTPError 
       as 
       e 
       : 
       error_body 
       = 
       e 
       . 
       read 
       () 
       . 
       decode 
       () 
       if 
       hasattr 
       ( 
       e 
       , 
       'read' 
       ) 
       else 
       '' 
       if 
       e 
       . 
       code 
       == 
       401 
       : 
       logger 
       . 
       error 
       ( 
       "Authentication failed: Invalid API key" 
       ) 
       raise 
       Exception 
       ( 
       "Invalid Harness API key. Check HARNESS_API_KEY environment variable." 
       ) 
       elif 
       e 
       . 
       code 
       == 
       403 
       : 
       logger 
       . 
       error 
       ( 
       "Authorization failed: Insufficient permissions" 
       ) 
       raise 
       Exception 
       ( 
       "API key lacks required audit:read permissions" 
       ) 
       elif 
       e 
       . 
       code 
       == 
       429 
       : 
       retry_after 
       = 
       int 
       ( 
       e 
       . 
       headers 
       . 
       get 
       ( 
       "Retry-After" 
       , 
       "60" 
       )) 
       logger 
       . 
       warning 
       ( 
       f 
       "Rate limit exceeded. Retry after 
       { 
       retry_after 
       } 
       seconds (attempt 
       { 
       retry_count 
        
       + 
        
       1 
       } 
       / 
       { 
       MAX_RETRIES 
       } 
       )" 
       ) 
       if 
       retry_count 
      < MAX_RETRIES 
       : 
       logger 
       . 
       info 
       ( 
       f 
       "Waiting 
       { 
       retry_after 
       } 
       seconds before retry..." 
       ) 
       time 
       . 
       sleep 
       ( 
       retry_after 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Retrying request (attempt 
       { 
       retry_count 
        
       + 
        
       2 
       } 
       / 
       { 
       MAX_RETRIES 
       } 
       )" 
       ) 
       return 
       _fetch_harness_audits 
       ( 
       since_ms 
       , 
       page_token 
       , 
       retry_count 
       + 
       1 
       ) 
       else 
       : 
       raise 
       Exception 
       ( 
       f 
       "Max retries ( 
       { 
       MAX_RETRIES 
       } 
       ) exceeded for rate limiting" 
       ) 
       elif 
       e 
       . 
       code 
       == 
       400 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Bad request: 
       { 
       error_body 
       } 
       " 
       ) 
       raise 
       Exception 
       ( 
       f 
       "Invalid request parameters: 
       { 
       error_body 
       } 
       " 
       ) 
       else 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "HTTP 
       { 
       e 
       . 
       code 
       } 
       : 
       { 
       e 
       . 
       reason 
       } 
       - 
       { 
       error_body 
       } 
       " 
       ) 
       raise 
       Exception 
       ( 
       f 
       "Harness API error 
       { 
       e 
       . 
       code 
       } 
       : 
       { 
       e 
       . 
       reason 
       } 
       " 
       ) 
       except 
       URLError 
       as 
       e 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Network error: 
       { 
       e 
       . 
       reason 
       } 
       " 
       ) 
       raise 
       Exception 
       ( 
       f 
       "Failed to connect to Harness API: 
       { 
       e 
       . 
       reason 
       } 
       " 
       ) 
       except 
       json 
       . 
       JSONDecodeError 
       as 
       e 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Invalid JSON response: 
       { 
       e 
       } 
       " 
       ) 
       logger 
       . 
       error 
       ( 
       f 
       "Response text (first 500 chars): 
       { 
       resp_text 
       [: 
       500 
       ] 
        
       if 
        
       'resp_text' 
        
       in 
        
       locals 
       () 
        
       else 
        
       'N/A' 
       } 
       " 
       ) 
       raise 
       Exception 
       ( 
       "Harness API returned invalid JSON" 
       ) 
       except 
       Exception 
       as 
       e 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Unexpected error in _fetch_harness_audits: 
       { 
       e 
       } 
       " 
       , 
       exc_info 
       = 
       True 
       ) 
       raise 
       # ============================================ 
       # S3 Upload Functions 
       # ============================================ 
       def 
        
       _upload_to_s3 
       ( 
       events 
       : 
       list 
       ) 
       - 
      > str 
       : 
        
       """ 
       Upload audit events to S3 in JSONL format. 
       Each line is a complete JSON object (one event per line). 
       """ 
       if 
       not 
       events 
       : 
       logger 
       . 
       info 
       ( 
       "No events to upload" 
       ) 
       return 
       None 
       try 
       : 
       # Create JSONL content (one JSON object per line) 
       jsonl_lines 
       = 
       [ 
       json 
       . 
       dumps 
       ( 
       event 
       ) 
       for 
       event 
       in 
       events 
       ] 
       jsonl_content 
       = 
       " 
       \n 
       " 
       . 
       join 
       ( 
       jsonl_lines 
       ) 
       # Generate S3 key with timestamp and UUID 
       timestamp 
       = 
       datetime 
       . 
       now 
       ( 
       timezone 
       . 
       utc 
       ) 
       key 
       = 
       ( 
       f 
       " 
       { 
       PREFIX 
       } 
       /" 
       f 
       " 
       { 
       timestamp 
       : 
       %Y/%m/%d 
       } 
       /" 
       f 
       "harness-audit- 
       { 
       timestamp 
       : 
       %Y%m%d-%H%M%S 
       } 
       - 
       { 
       uuid 
       . 
       uuid4 
       () 
       } 
       .jsonl" 
       ) 
       # Upload to S3 
       s3 
       . 
       put_object 
       ( 
       Bucket 
       = 
       BUCKET 
       , 
       Key 
       = 
       key 
       , 
       Body 
       = 
       jsonl_content 
       . 
       encode 
       ( 
       'utf-8' 
       ), 
       ContentType 
       = 
       "application/x-ndjson" 
       , 
       Metadata 
       = 
       { 
       "event-count" 
       : 
       str 
       ( 
       len 
       ( 
       events 
       )), 
       "source" 
       : 
       "harness-audit-lambda" 
       , 
       "collection-time" 
       : 
       timestamp 
       . 
       isoformat 
       () 
       } 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Uploaded 
       { 
       len 
       ( 
       events 
       ) 
       } 
       events to s3:// 
       { 
       BUCKET 
       } 
       / 
       { 
       key 
       } 
       " 
       ) 
       return 
       key 
       except 
       Exception 
       as 
       e 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Error uploading to S3: 
       { 
       e 
       } 
       " 
       , 
       exc_info 
       = 
       True 
       ) 
       raise 
       # ============================================ 
       # Main Orchestration Function 
       # ============================================ 
       def 
        
       fetch_and_store 
       (): 
        
       """ 
       Main function to fetch audit logs from Harness and store in S3. 
       Handles pagination and state management. 
       """ 
       logger 
       . 
       info 
       ( 
       "=== Harness Audit Collection Started ===" 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Configuration: API_BASE= 
       { 
       API_BASE 
       } 
       , ACCOUNT_ID= 
       { 
       ACCOUNT_ID 
       [: 
       8 
       ] 
       } 
       ..., PAGE_SIZE= 
       { 
       PAGE_SIZE 
       } 
       " 
       ) 
       if 
       FILTER_MODULES 
       : 
       logger 
       . 
       info 
       ( 
       f 
       "Module filter enabled: 
       { 
       FILTER_MODULES 
       } 
       " 
       ) 
       if 
       FILTER_ACTIONS 
       : 
       logger 
       . 
       info 
       ( 
       f 
       "Action filter enabled: 
       { 
       FILTER_ACTIONS 
       } 
       " 
       ) 
       if 
       STATIC_FILTER 
       : 
       logger 
       . 
       info 
       ( 
       f 
       "Static filter enabled: 
       { 
       STATIC_FILTER 
       } 
       " 
       ) 
       try 
       : 
       # Step 1: Read checkpoint state 
       since_ms 
       , 
       page_token 
       = 
       _read_state 
       () 
       if 
       page_token 
       : 
       logger 
       . 
       info 
       ( 
       f 
       "Resuming pagination from saved pageToken" 
       ) 
       else 
       : 
       since_dt 
       = 
       datetime 
       . 
       fromtimestamp 
       ( 
       since_ms 
       / 
       1000 
       , 
       tz 
       = 
       timezone 
       . 
       utc 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Starting new collection from: 
       { 
       since_dt 
       . 
       isoformat 
       () 
       } 
       " 
       ) 
       # Step 2: Collect all events with pagination 
       all_events 
       = 
       [] 
       current_page_token 
       = 
       page_token 
       page_count 
       = 
       0 
       max_pages 
       = 
       100 
       # Safety limit 
       has_next 
       = 
       True 
       while 
       has_next 
       and 
       page_count 
      < max_pages 
       : 
       page_count 
       += 
       1 
       logger 
       . 
       info 
       ( 
       f 
       "--- Fetching page 
       { 
       page_count 
       } 
       ---" 
       ) 
       # Fetch one page of results 
       result 
       = 
       _fetch_harness_audits 
       ( 
       since_ms 
       , 
       current_page_token 
       ) 
       # Extract events 
       events 
       = 
       result 
       . 
       get 
       ( 
       "events" 
       , 
       []) 
       all_events 
       . 
       extend 
       ( 
       events 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Page 
       { 
       page_count 
       } 
       : 
       { 
       len 
       ( 
       events 
       ) 
       } 
       events (total: 
       { 
       len 
       ( 
       all_events 
       ) 
       } 
       )" 
       ) 
       # Check pagination status 
       has_next 
       = 
       result 
       . 
       get 
       ( 
       "hasNext" 
       , 
       False 
       ) 
       current_page_token 
       = 
       result 
       . 
       get 
       ( 
       "pageToken" 
       ) 
       if 
       not 
       has_next 
       : 
       logger 
       . 
       info 
       ( 
       "Pagination complete (hasNext=False)" 
       ) 
       break 
       if 
       not 
       current_page_token 
       : 
       logger 
       . 
       warning 
       ( 
       "hasNext=True but no pageToken, stopping pagination" 
       ) 
       break 
       # Small delay between pages to avoid rate limiting 
       time 
       . 
       sleep 
       ( 
       0.5 
       ) 
       if 
       page_count 
      > = 
       max_pages 
       : 
       logger 
       . 
       warning 
       ( 
       f 
       "Reached max pages limit ( 
       { 
       max_pages 
       } 
       ), stopping" 
       ) 
       # Step 3: Upload collected events to S3 
       if 
       all_events 
       : 
       s3_key 
       = 
       _upload_to_s3 
       ( 
       all_events 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Successfully uploaded 
       { 
       len 
       ( 
       all_events 
       ) 
       } 
       total events" 
       ) 
       else 
       : 
       logger 
       . 
       info 
       ( 
       "No new events to upload" 
       ) 
       s3_key 
       = 
       None 
       # Step 4: Update checkpoint state 
       if 
       not 
       has_next 
       : 
       # Pagination complete - update since to current time for next run 
       new_since 
       = 
       int 
       ( 
       time 
       . 
       time 
       () 
       * 
       1000 
       ) 
       _write_state 
       ( 
       new_since 
       , 
       None 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Pagination complete, state updated with new since= 
       { 
       new_since 
       } 
       " 
       ) 
       else 
       : 
       # Pagination incomplete - save pageToken for continuation 
       _write_state 
       ( 
       since_ms 
       , 
       current_page_token 
       ) 
       logger 
       . 
       info 
       ( 
       f 
       "Pagination incomplete, saved pageToken for next run" 
       ) 
       # Step 5: Return result 
       result 
       = 
       { 
       "statusCode" 
       : 
       200 
       , 
       "message" 
       : 
       "Success" 
       , 
       "eventsCollected" 
       : 
       len 
       ( 
       all_events 
       ), 
       "pagesProcessed" 
       : 
       page_count 
       , 
       "paginationComplete" 
       : 
       not 
       has_next 
       , 
       "s3Key" 
       : 
       s3_key 
       , 
       "filters" 
       : 
       { 
       "modules" 
       : 
       FILTER_MODULES 
       , 
       "actions" 
       : 
       FILTER_ACTIONS 
       , 
       "staticFilter" 
       : 
       STATIC_FILTER 
       } 
       } 
       logger 
       . 
       info 
       ( 
       f 
       "Collection completed: 
       { 
       json 
       . 
       dumps 
       ( 
       result 
       ) 
       } 
       " 
       ) 
       return 
       result 
       except 
       Exception 
       as 
       e 
       : 
       logger 
       . 
       error 
       ( 
       f 
       "Collection failed: 
       { 
       e 
       } 
       " 
       , 
       exc_info 
       = 
       True 
       ) 
       result 
       = 
       { 
       "statusCode" 
       : 
       500 
       , 
       "message" 
       : 
       "Error" 
       , 
       "error" 
       : 
       str 
       ( 
       e 
       ), 
       "errorType" 
       : 
       type 
       ( 
       e 
       ) 
       . 
       __name__ 
       } 
       return 
       result 
       finally 
       : 
       logger 
       . 
       info 
       ( 
       "=== Harness Audit Collection Finished ===" 
       ) 
       # ============================================ 
       # Lambda Handler 
       # ============================================ 
       def 
        
       lambda_handler 
       ( 
       event 
       , 
       context 
       ): 
        
       """AWS Lambda handler function.""" 
       return 
       fetch_and_store 
       () 
       # ============================================ 
       # Local Testing 
       # ============================================ 
       if 
       __name__ 
       == 
       "__main__" 
       : 
       # For local testing 
       result 
       = 
       lambda_handler 
       ( 
       None 
       , 
       None 
       ) 
       print 
       ( 
       json 
       . 
       dumps 
       ( 
       result 
       , 
       indent 
       = 
       2 
       )) 
       
      
  7. Click Deployto save the function code.

Configure Lambda environment variables

  1. In the Lambda function page, select the Configurationtab.
  2. Click Environment variablesin the left sidebar.
  3. Click Edit.
  4. Click Add environment variablefor each of the following:

    Required environment variables:

    Key Value Description
    HARNESS_ACCOUNT_ID
    Your Harness Account ID Account identifier from Harness
    HARNESS_API_KEY
    Your API key token Token with audit:read permissions
    S3_BUCKET
    harness-io-logs S3 bucket name
    S3_PREFIX
    harness/audit Prefix for S3 objects
    STATE_KEY
    harness/audit/state.json State file path in S3

    Optional environment variables:

    Key Default Value Description
    HARNESS_API_BASE
    https://app.harness.io Harness API base URL
    PAGE_SIZE
    50 Events per page (max 100)
    START_MINUTES_BACK
    60 Initial lookback period in minutes
    FILTER_MODULES
    None Comma-separated modules (e.g., CD,CI,CE )
    FILTER_ACTIONS
    None Comma-separated actions (e.g., CREATE,UPDATE,DELETE )
    STATIC_FILTER
    None Pre-defined filter: EXCLUDE_LOGIN_EVENTS or EXCLUDE_SYSTEM_EVENTS
    MAX_RETRIES
    3 Max retry attempts for rate limiting
  5. Click Save.

Configure Lambda timeout and memory

  1. In the Lambda function page, select the Configurationtab.
  2. Click General configurationin the left sidebar.
  3. Click Edit.
  4. Provide the following configuration details:
    • Memory: 256 MB (recommended)
    • Timeout: 5 min 0 sec (300 seconds)
  5. Click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Schedule name: Enter harness-audit-hourly .
    • Description: Optional description.
  3. Click Next.
  4. Under Schedule pattern, select Recurring schedule.
  5. Select Rate-based schedule.
  6. Provide the following configuration details:
    • Rate expression: Enter 1 hour .
  7. Click Next.
  8. Under Target, provide the following configuration details:
    • Target API: Select AWS Lambda Invoke.
    • Lambda function: Select your function harness-audit-to-s3 .
  9. Click Next.
  10. Review the schedule configuration.
  11. Click Create schedule.

Create read-only IAM user for Google SecOps

This IAM user allows Google SecOps to read logs from the S3 bucket.

  1. Go to AWS Console > IAM > Users > Create user.
  2. Provide the following configuration details:
    • User name: Enter chronicle-s3-reader .
  3. Click Next.
  4. Select Attach policies directly.
  5. Click Create policy.
  6. Select the JSONtab.
  7. Paste the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
      
     "s3:GetObject" 
      
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::harness-io-logs/harness/audit/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
      
     "s3:ListBucket" 
      
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::harness-io-logs" 
     , 
      
     "Condition" 
     : 
      
     { 
      
     "StringLike" 
     : 
      
     { 
      
     "s3:prefix" 
     : 
      
     "harness/audit/*" 
      
     } 
      
     } 
      
     } 
      
     ] 
     } 
     
    
  8. Click Next.

  9. Name the policy ChronicleHarnessS3ReadPolicy .

  10. Click Create policy.

  11. Return to the user creation tab and refresh the policies list.

  12. Search for and select ChronicleHarnessS3ReadPolicy .

  13. Click Next.

  14. Review and click Create user.

Create access keys for the reader user

  1. In the IAM Userspage, select the chronicle-s3-reader user.
  2. Select the Security credentialstab.
  3. Click Create access key.
  4. Select Third-party serviceas the use case.
  5. Click Next.
  6. Optional: Add description tag.
  7. Click Create access key.
  8. Click Download CSV fileto save the Access Key ID and Secret Access Key.
  9. Click Done.

Configure a feed in Google SecOps to ingest Harness IO logs

  1. Go to SIEM Settings > Feeds.
  2. Click Add new.
  3. On the next page, click Configure a single feed.
  4. In the Feed namefield, enter a name for the feed (for example, Harness Audit Logs ).
  5. Select Amazon S3 V2as the Source type.
  6. Select Harness IOas the Log type.
  7. Click Next.
  8. Specify values for the following input parameters:

    • S3 URI: Enter the S3 bucket URI with the prefix path: s3://harness-io-logs/harness/audit/
    • Source deletion option: Select the deletion option according to your preference:

      • Never: Never deletes any files after transfers (recommended initially).
      • On success: Deletes all files and empty directories after successful transfer.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.

    • Access Key ID: Enter the Access Key ID from the chronicle-s3-reader user.

    • Secret Access Key: Enter the Secret Access Key from the chronicle-s3-reader user.

    • Asset namespace: The asset namespace . Enter harness.audit .

    • Ingestion labels: Optional labels to be applied to events from this feed.

  9. Click Next.

  10. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: