Collect CrowdStrike IDP Services logs

Supported in:

This document explains how to ingest CrowdStrike Identity Protection (IDP) Services logs to Google Security Operations using Amazon S3. The integration uses the CrowdStrike Unified Alerts API to collect Identity Protection events and stores them in NDJSON format for processing by the built-in CS_IDP parser.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance.
  • Privileged access to CrowdStrike Falcon Consoleand API key management.
  • Privileged access to AWS(S3, Identity and Access Management (IAM), Lambda, EventBridge).

Get CrowdStrike Identity Protection prerequisites

  1. Sign in to the CrowdStrike Falcon Console.
  2. Go to Support and Resources > API clients and keys.
  3. Click Add new API Client.
  4. Provide the following configuration details:
    • Client Name: Enter Google SecOps IDP Integration .
    • Description: Enter API client for Google SecOps integration .
    • Scopes: Select Alerts: READ( alerts:read ) scope (this includes Identity Protection alerts).
  5. Click Add.
  6. Copy and save in a secure location the following details:
    • Client ID
    • Client Secret(this is only shown once)
    • Base URL(examples: api.crowdstrike.com for US-1, api.us-2.crowdstrike.com for US-2, api.eu-1.crowdstrike.com for EU-1)

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, crowdstrike-idp-logs-bucket ).
  3. Create a Userfollowing this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select Security credentialstab.
  6. Click Create Access Keyin section Access Keys.
  7. Select Third-party serviceas Use case.
  8. Click Next.
  9. Optional: Add description tag.
  10. Click Create access key.
  11. Click Download .CSV fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select Permissionstab.
  14. Click Add permissionsin section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies.
  2. Click Create policy > JSON tab.
  3. Copy and paste the following policy.
  4. Policy JSON(replace crowdstrike-idp-logs-bucket if you entered a different bucket name):

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::crowdstrike-idp-logs-bucket/crowdstrike-idp/state.json" 
      
     } 
      
     ] 
     } 
     
    
  5. Click Next > Create policy.

  6. Go to IAM > Roles > Create role > AWS service > Lambda.

  7. Attach the newly created policy.

  8. Name the role CrowdStrike-IDP-Lambda-Role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name CrowdStrike-IDP-Collector
    Runtime Python 3.13
    Architecture x86_64
    Execution role CrowdStrike-IDP-Lambda-Role
  4. After the function is created, open the Codetab, delete the stub and paste the following code:

      import 
      
     json 
     import 
      
     boto3 
     import 
      
     urllib3 
     import 
      
     os 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timezone 
     from 
      
     urllib.parse 
      
     import 
     urlencode 
     HTTP 
     = 
     urllib3 
     . 
     PoolManager 
     () 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
      
     """ 
     Fetch CrowdStrike Identity Protection alerts (Unified Alerts API) 
     and store RAW JSON (NDJSON) to S3 for the CS_IDP parser. 
     No transformation is performed. 
     """ 
     # Environment variables 
     s3_bucket 
     = 
     os 
     . 
     environ 
     [ 
     'S3_BUCKET' 
     ] 
     s3_prefix 
     = 
     os 
     . 
     environ 
     [ 
     'S3_PREFIX' 
     ] 
     state_key 
     = 
     os 
     . 
     environ 
     [ 
     'STATE_KEY' 
     ] 
     client_id 
     = 
     os 
     . 
     environ 
     [ 
     'CROWDSTRIKE_CLIENT_ID' 
     ] 
     client_secret 
     = 
     os 
     . 
     environ 
     [ 
     'CROWDSTRIKE_CLIENT_SECRET' 
     ] 
     api_base 
     = 
     os 
     . 
     environ 
     [ 
     'API_BASE' 
     ] 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     's3' 
     ) 
     token 
     = 
     get_token 
     ( 
     client_id 
     , 
     client_secret 
     , 
     api_base 
     ) 
     last_ts 
     = 
     get_last_timestamp 
     ( 
     s3 
     , 
     s3_bucket 
     , 
     state_key 
     ) 
     # FQL filter for Identity Protection alerts only, newer than checkpoint 
     fql_filter 
     = 
     f 
     "product:'idp'+updated_timestamp:>' 
     { 
     last_ts 
     } 
     '" 
     sort 
     = 
     'updated_timestamp.asc' 
     # Step 1: Get list of alert IDs 
     all_ids 
     = 
     [] 
     per_page 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'ALERTS_LIMIT' 
     , 
     '1000' 
     )) 
     # up to 10000 per SDK docs 
     offset 
     = 
     0 
     while 
     True 
     : 
     page_ids 
     = 
     query_alert_ids 
     ( 
     api_base 
     , 
     token 
     , 
     fql_filter 
     , 
     sort 
     , 
     per_page 
     , 
     offset 
     ) 
     if 
     not 
     page_ids 
     : 
     break 
     all_ids 
     . 
     extend 
     ( 
     page_ids 
     ) 
     if 
     len 
     ( 
     page_ids 
     ) 
    < per_page 
     : 
     break 
     offset 
     += 
     per_page 
     if 
     not 
     all_ids 
     : 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     'No new Identity Protection alerts.' 
     } 
     # Step 2: Get alert details in batches (max 1000 IDs per request) 
     details 
     = 
     [] 
     max_batch 
     = 
     1000 
     for 
     i 
     in 
     range 
     ( 
     0 
     , 
     len 
     ( 
     all_ids 
     ), 
     max_batch 
     ): 
     batch 
     = 
     all_ids 
     [ 
     i 
     : 
     i 
     + 
     max_batch 
     ] 
     details 
     . 
     extend 
     ( 
     fetch_alert_details 
     ( 
     api_base 
     , 
     token 
     , 
     batch 
     )) 
     if 
     details 
     : 
     details 
     . 
     sort 
     ( 
     key 
     = 
     lambda 
     d 
     : 
     d 
     . 
     get 
     ( 
     'updated_timestamp' 
     , 
     d 
     . 
     get 
     ( 
     'created_timestamp' 
     , 
     '' 
     ))) 
     latest 
     = 
     details 
     [ 
     - 
     1 
     ] 
     . 
     get 
     ( 
     'updated_timestamp' 
     ) 
     or 
     details 
     [ 
     - 
     1 
     ] 
     . 
     get 
     ( 
     'created_timestamp' 
     ) 
     key 
     = 
     f 
     " 
     { 
     s3_prefix 
     } 
     cs_idp_ 
     { 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     } 
     .json" 
     body 
     = 
     ' 
     \n 
     ' 
     . 
     join 
     ( 
     json 
     . 
     dumps 
     ( 
     d 
     , 
     separators 
     = 
     ( 
     ',' 
     , 
     ':' 
     )) 
     for 
     d 
     in 
     details 
     ) 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     s3_bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     body 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     ContentType 
     = 
     'application/x-ndjson' 
     ) 
     update_state 
     ( 
     s3 
     , 
     s3_bucket 
     , 
     state_key 
     , 
     latest 
     ) 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     f 
     'Wrote 
     { 
     len 
     ( 
     details 
     ) 
     } 
     alerts to S3.' 
     } 
     def 
      
     get_token 
     ( 
     client_id 
     , 
     client_secret 
     , 
     api_base 
     ): 
      
     """Get OAuth2 token from CrowdStrike API""" 
     url 
     = 
     f 
     "https:// 
     { 
     api_base 
     } 
     /oauth2/token" 
     data 
     = 
     f 
     "client_id= 
     { 
     client_id 
     } 
    & client_secret= 
     { 
     client_secret 
     } 
    & grant_type=client_credentials" 
     headers 
     = 
     { 
     'Content-Type' 
     : 
     'application/x-www-form-urlencoded' 
     } 
     r 
     = 
     HTTP 
     . 
     request 
     ( 
     'POST' 
     , 
     url 
     , 
     body 
     = 
     data 
     , 
     headers 
     = 
     headers 
     ) 
     if 
     r 
     . 
     status 
     != 
     200 
     : 
     raise 
     Exception 
     ( 
     f 
     'Auth failed: 
     { 
     r 
     . 
     status 
     } 
      
     { 
     r 
     . 
     data 
     } 
     ' 
     ) 
     return 
     json 
     . 
     loads 
     ( 
     r 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     ))[ 
     'access_token' 
     ] 
     def 
      
     query_alert_ids 
     ( 
     api_base 
     , 
     token 
     , 
     fql_filter 
     , 
     sort 
     , 
     limit 
     , 
     offset 
     ): 
      
     """Query alert IDs using filters""" 
     url 
     = 
     f 
     "https:// 
     { 
     api_base 
     } 
     /alerts/queries/alerts/v2" 
     params 
     = 
     { 
     'filter' 
     : 
     fql_filter 
     , 
     'sort' 
     : 
     sort 
     , 
     'limit' 
     : 
     str 
     ( 
     limit 
     ), 
     'offset' 
     : 
     str 
     ( 
     offset 
     )} 
     qs 
     = 
     urlencode 
     ( 
     params 
     ) 
     r 
     = 
     HTTP 
     . 
     request 
     ( 
     'GET' 
     , 
     f 
     " 
     { 
     url 
     } 
     ? 
     { 
     qs 
     } 
     " 
     , 
     headers 
     = 
     { 
     'Authorization' 
     : 
     f 
     'Bearer 
     { 
     token 
     } 
     ' 
     }) 
     if 
     r 
     . 
     status 
     != 
     200 
     : 
     raise 
     Exception 
     ( 
     f 
     'Query alerts failed: 
     { 
     r 
     . 
     status 
     } 
      
     { 
     r 
     . 
     data 
     } 
     ' 
     ) 
     resp 
     = 
     json 
     . 
     loads 
     ( 
     r 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     return 
     resp 
     . 
     get 
     ( 
     'resources' 
     , 
     []) 
     def 
      
     fetch_alert_details 
     ( 
     api_base 
     , 
     token 
     , 
     composite_ids 
     ): 
      
     """Fetch detailed alert data by composite IDs""" 
     url 
     = 
     f 
     "https:// 
     { 
     api_base 
     } 
     /alerts/entities/alerts/v2" 
     body 
     = 
     { 
     'composite_ids' 
     : 
     composite_ids 
     } 
     headers 
     = 
     { 
     'Authorization' 
     : 
     f 
     'Bearer 
     { 
     token 
     } 
     ' 
     , 
     'Content-Type' 
     : 
     'application/json' 
     } 
     r 
     = 
     HTTP 
     . 
     request 
     ( 
     'POST' 
     , 
     url 
     , 
     body 
     = 
     json 
     . 
     dumps 
     ( 
     body 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     headers 
     = 
     headers 
     ) 
     if 
     r 
     . 
     status 
     != 
     200 
     : 
     raise 
     Exception 
     ( 
     f 
     'Fetch alert details failed: 
     { 
     r 
     . 
     status 
     } 
      
     { 
     r 
     . 
     data 
     } 
     ' 
     ) 
     resp 
     = 
     json 
     . 
     loads 
     ( 
     r 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     return 
     resp 
     . 
     get 
     ( 
     'resources' 
     , 
     []) 
     def 
      
     get_last_timestamp 
     ( 
     s3 
     , 
     bucket 
     , 
     key 
     , 
     default 
     = 
     '2023-01-01T00:00:00Z' 
     ): 
      
     """Get last processed timestamp from S3 state file""" 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     ) 
     state 
     = 
     json 
     . 
     loads 
     ( 
     obj 
     [ 
     'Body' 
     ] 
     . 
     read 
     () 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     return 
     state 
     . 
     get 
     ( 
     'last_timestamp' 
     , 
     default 
     ) 
     except 
     s3 
     . 
     exceptions 
     . 
     NoSuchKey 
     : 
     return 
     default 
     def 
      
     update_state 
     ( 
     s3 
     , 
     bucket 
     , 
     key 
     , 
     ts 
     ): 
      
     """Update last processed timestamp in S3 state file""" 
     state 
     = 
     { 
     'last_timestamp' 
     : 
     ts 
     , 
     'updated' 
     : 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     . 
     isoformat 
     ()} 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     ContentType 
     = 
     'application/json' 
     ) 
     
    
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the environment variables provided in the following able, replacing the example values with your values.

    Environment variables

    Key Example value
    S3_BUCKET crowdstrike-idp-logs-bucket
    S3_PREFIX crowdstrike-idp/
    STATE_KEY crowdstrike-idp/state.json
    CROWDSTRIKE_CLIENT_ID <your-client-id>
    CROWDSTRIKE_CLIENT_SECRET <your-client-secret>
    API_BASE api.crowdstrike.com (US-1), api.us-2.crowdstrike.com (US-2), api.eu-1.crowdstrike.com (EU-1)
    ALERTS_LIMIT 1000 (optional, max 10000 per page)
  8. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  9. Select the Configurationtab.

  10. In the General configurationpanel click Edit.

  11. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 15 minutes ).
    • Target: your Lambda function CrowdStrike-IDP-Collector .
    • Name: CrowdStrike-IDP-Collector-15m .
  3. Click Create schedule.

(Optional) Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader .
    • Access type: Select Access key – Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. JSON:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::crowdstrike-idp-logs-bucket" 
      
     } 
      
     ] 
     } 
     
    
  7. Name = secops-reader-policy .

  8. Click Create policy > search/select > Next > Add permissions.

  9. Create access key for secops-reader : Security credentials > Access keys.

  10. Click Create access key.

  11. Download the .CSV . (You'll paste these values into the feed).

Configure a feed in Google SecOps to ingest CrowdStrike Identity Protection Services logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, CrowdStrike Identity Protection Services logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select Crowdstrike Identity Protection Servicesas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://crowdstrike-idp-logs-bucket/crowdstrike-idp/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: