Collect Digital Shadows SearchLight logs

Supported in:

This document explains how to ingest Digital Shadows SearchLight logs to Google Security Operations using Amazon S3. The parser extracts security event data from the JSON logs. It initializes Unified Data Model (UDM) fields, parses the JSON payload, maps relevant fields to the UDM schema, extracts entities like email and hostname using grok patterns, and constructs the security_result and metadata objects within the UDM event.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance.
  • Privileged access to Digital Shadows SearchLighttenant.
  • Privileged access to AWS(S3, Identity and Access Management (IAM), Lambda, EventBridge).

Collect Digital Shadows SearchLight prerequisites (IDs, API keys, org IDs, tokens)

  1. Sign in to the Digital Shadows SearchLight Portal.
  2. Go to Settings > API Credentials.
  3. Create a new API client or key pair.
  4. Copy and save in a secure location the following details:
    • API Key
    • API Secret
    • Account ID
    • API Base URL: https://api.searchlight.app/v1 or https://portal-digitalshadows.com/api/v1

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, digital-shadows-logs ).
  3. Create a Userfollowing this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select Security credentialstab.
  6. Click Create Access Keyin section Access Keys.
  7. Select Third-party serviceas Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download .CSV fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select Permissionstab.
  14. Click Add permissionsin section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies.
  2. Click Create policy > JSON tab.
  3. Copy and paste the following policy.
  4. Policy JSON(replace digital-shadows-logs if you entered a different bucket name):

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::digital-shadows-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::digital-shadows-logs/digital-shadows-searchlight/state.json" 
      
     } 
      
     ] 
     } 
     
    
  5. Click Next > Create policy.

  6. Go to IAM > Roles > Create role > AWS service > Lambda.

  7. Attach the newly created policy.

  8. Name the role digital-shadows-lambda-role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name digital-shadows-collector
    Runtime Python 3.13
    Architecture x86_64
    Execution role digital-shadows-lambda-role
  4. After the function is created, open the Codetab, delete the stub and paste the following code ( digital-shadows-collector.py ).

      import 
      
     json 
     import 
      
     os 
     import 
      
     base64 
     import 
      
     logging 
     import 
      
     time 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timedelta 
     , 
     timezone 
     from 
      
     urllib.parse 
      
     import 
     urlencode 
     import 
      
     boto3 
     import 
      
     urllib3 
     logger 
     = 
     logging 
     . 
     getLogger 
     () 
     logger 
     . 
     setLevel 
     ( 
     logging 
     . 
     INFO 
     ) 
     HTTP 
     = 
     urllib3 
     . 
     PoolManager 
     ( 
     retries 
     = 
     False 
     ) 
     def 
      
     _basic_auth_header 
     ( 
     key 
     : 
     str 
     , 
     secret 
     : 
     str 
     ) 
     - 
    > str 
     : 
     token 
     = 
     base64 
     . 
     b64encode 
     ( 
     f 
     " 
     { 
     key 
     } 
     : 
     { 
     secret 
     } 
     " 
     . 
     encode 
     ( 
     "utf-8" 
     )) 
     . 
     decode 
     ( 
     "utf-8" 
     ) 
     return 
     f 
     "Basic 
     { 
     token 
     } 
     " 
     def 
      
     _load_state 
     ( 
     s3 
     , 
     bucket 
     , 
     key 
     , 
     default_days 
     = 
     30 
     ) 
     - 
    > str 
     : 
      
     """Return ISO8601 checkpoint (UTC).""" 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     ) 
     state 
     = 
     json 
     . 
     loads 
     ( 
     obj 
     [ 
     "Body" 
     ] 
     . 
     read 
     () 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     ts 
     = 
     state 
     . 
     get 
     ( 
     "last_timestamp" 
     ) 
     if 
     ts 
     : 
     return 
     ts 
     except 
     s3 
     . 
     exceptions 
     . 
     NoSuchKey 
     : 
     pass 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     warning 
     ( 
     f 
     "State read error: 
     { 
     e 
     } 
     " 
     ) 
     return 
     ( 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     - 
     timedelta 
     ( 
     days 
     = 
     default_days 
     )) 
     . 
     isoformat 
     () 
     def 
      
     _save_state 
     ( 
     s3 
     , 
     bucket 
     , 
     key 
     , 
     ts 
     : 
     str 
     ) 
     - 
    > None 
     : 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ({ 
     "last_timestamp" 
     : 
     ts 
     }) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     , 
     ) 
     def 
      
     _get_json 
     ( 
     url 
     : 
     str 
     , 
     headers 
     : 
     dict 
     , 
     params 
     : 
     dict 
     , 
     backoff_s 
     = 
     2 
     , 
     max_retries 
     = 
     3 
     ) 
     - 
    > dict 
     : 
     qs 
     = 
     f 
     "? 
     { 
     urlencode 
     ( 
     params 
     ) 
     } 
     " 
     if 
     params 
     else 
     "" 
     for 
     attempt 
     in 
     range 
     ( 
     max_retries 
     ): 
     r 
     = 
     HTTP 
     . 
     request 
     ( 
     "GET" 
     , 
     f 
     " 
     { 
     url 
     }{ 
     qs 
     } 
     " 
     , 
     headers 
     = 
     headers 
     ) 
     if 
     r 
     . 
     status 
     == 
     200 
     : 
     return 
     json 
     . 
     loads 
     ( 
     r 
     . 
     data 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     if 
     r 
     . 
     status 
     in 
     ( 
     429 
     , 
     500 
     , 
     502 
     , 
     503 
     , 
     504 
     ): 
     wait 
     = 
     backoff_s 
     * 
     ( 
     2 
     ** 
     attempt 
     ) 
     logger 
     . 
     warning 
     ( 
     f 
     "HTTP 
     { 
     r 
     . 
     status 
     } 
     from DS API, retrying in 
     { 
     wait 
     } 
     s" 
     ) 
     time 
     . 
     sleep 
     ( 
     wait 
     ) 
     continue 
     raise 
     RuntimeError 
     ( 
     f 
     "DS API error 
     { 
     r 
     . 
     status 
     } 
     : 
     { 
     r 
     . 
     data 
     [: 
     200 
     ] 
     } 
     " 
     ) 
     raise 
     RuntimeError 
     ( 
     "Exceeded retry budget for DS API" 
     ) 
     def 
      
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     path 
     , 
     since_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     ): 
     items 
     = 
     [] 
     for 
     page 
     in 
     range 
     ( 
     max_pages 
     ): 
     params 
     = 
     { 
     "limit" 
     : 
     page_size 
     , 
     "offset" 
     : 
     page 
     * 
     page_size 
     , 
     time_param 
     : 
     since_ts 
     , 
     } 
     if 
     account_id 
     : 
     params 
     [ 
     "account-id" 
     ] 
     = 
     account_id 
     data 
     = 
     _get_json 
     ( 
     f 
     " 
     { 
     api_base 
     } 
     / 
     { 
     path 
     } 
     " 
     , 
     headers 
     , 
     params 
     ) 
     batch 
     = 
     data 
     . 
     get 
     ( 
     "items" 
     ) 
     or 
     data 
     . 
     get 
     ( 
     "data" 
     ) 
     or 
     [] 
     if 
     not 
     batch 
     : 
     break 
     items 
     . 
     extend 
     ( 
     batch 
     ) 
     if 
     len 
     ( 
     batch 
     ) 
    < page_size 
     : 
     break 
     return 
     items 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
     # Required 
     s3_bucket 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     api_key 
     = 
     os 
     . 
     environ 
     [ 
     "DS_API_KEY" 
     ] 
     api_secret 
     = 
     os 
     . 
     environ 
     [ 
     "DS_API_SECRET" 
     ] 
     # Optional / defaults 
     s3_prefix 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "digital-shadows-searchlight/" 
     ) 
     state_key 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     , 
     "digital-shadows-searchlight/state.json" 
     ) 
     api_base 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "API_BASE" 
     , 
     "https://api.searchlight.app/v1" 
     ) 
     account_id 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "DS_ACCOUNT_ID" 
     , 
     "" 
     ) 
     page_size 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "PAGE_SIZE" 
     , 
     "100" 
     )) 
     max_pages 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "MAX_PAGES" 
     , 
     "10" 
     )) 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     last_ts 
     = 
     _load_state 
     ( 
     s3 
     , 
     s3_bucket 
     , 
     state_key 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Checkpoint: 
     { 
     last_ts 
     } 
     " 
     ) 
     headers 
     = 
     { 
     "Authorization" 
     : 
     _basic_auth_header 
     ( 
     api_key 
     , 
     api_secret 
     ), 
     "Accept" 
     : 
     "application/json" 
     , 
     "User-Agent" 
     : 
     "Chronicle-DigitalShadows-S3/1.0" 
     , 
     } 
     records 
     = 
     [] 
     # Incidents (time filter often 'published-after' or 'updated-since' depending on tenancy) 
     incidents 
     = 
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     "incidents" 
     , 
     last_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     = 
     "published-after" 
     ) 
     for 
     incident 
     in 
     incidents 
     : 
     incident 
     [ 
     '_source_type' 
     ] 
     = 
     'incident' 
     records 
     . 
     extend 
     ( 
     incidents 
     ) 
     # Intelligence incidents (alerts) 
     intel_incidents 
     = 
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     "intel-incidents" 
     , 
     last_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     = 
     "published-after" 
     ) 
     for 
     intel 
     in 
     intel_incidents 
     : 
     intel 
     [ 
     '_source_type' 
     ] 
     = 
     'intelligence_incident' 
     records 
     . 
     extend 
     ( 
     intel_incidents 
     ) 
     # Indicators (IOCs) 
     indicators 
     = 
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     "indicators" 
     , 
     last_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     = 
     "lastUpdated-after" 
     ) 
     for 
     indicator 
     in 
     indicators 
     : 
     indicator 
     [ 
     '_source_type' 
     ] 
     = 
     'ioc' 
     records 
     . 
     extend 
     ( 
     indicators 
     ) 
     if 
     records 
     : 
     # Choose newest timestamp seen in this batch 
     newest 
     = 
     max 
     ( 
     ( 
     r 
     . 
     get 
     ( 
     "updated" 
     ) 
     or 
     r 
     . 
     get 
     ( 
     "raised" 
     ) 
     or 
     r 
     . 
     get 
     ( 
     "lastUpdated" 
     ) 
     or 
     last_ts 
     ) 
     for 
     r 
     in 
     records 
     ) 
     key 
     = 
     f 
     " 
     { 
     s3_prefix 
     } 
     digital_shadows_ 
     { 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     } 
     .json" 
     body 
     = 
     " 
     \n 
     " 
     . 
     join 
     ( 
     json 
     . 
     dumps 
     ( 
     r 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     for 
     r 
     in 
     records 
     ) 
     . 
     encode 
     ( 
     "utf-8" 
     ) 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     s3_bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     body 
     , 
     ContentType 
     = 
     "application/x-ndjson" 
     , 
     ) 
     _save_state 
     ( 
     s3 
     , 
     s3_bucket 
     , 
     state_key 
     , 
     newest 
     ) 
     msg 
     = 
     f 
     "Wrote 
     { 
     len 
     ( 
     records 
     ) 
     } 
     records to s3:// 
     { 
     s3_bucket 
     } 
     / 
     { 
     key 
     } 
     " 
     else 
     : 
     msg 
     = 
     "No new records" 
     logger 
     . 
     info 
     ( 
     msg 
     ) 
     return 
     { 
     "statusCode" 
     : 
     200 
     , 
     "body" 
     : 
     msg 
     } 
     
    
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the environment variables provided in the following table, replacing the example values with your values.

    Environment variables

    Key Example value
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows-searchlight/
    STATE_KEY digital-shadows-searchlight/state.json
    DS_API_KEY <your-6-character-api-key>
    DS_API_SECRET <your-32-character-api-secret>
    API_BASE https://api.searchlight.app/v1 (or https://portal-digitalshadows.com/api/v1 )
    DS_ACCOUNT_ID <your-account-id> (required for most tenants)
    PAGE_SIZE 100
    MAX_PAGES 10
  8. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  9. Select the Configurationtab.

  10. In the General configurationpanel, click Edit.

  11. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: your Lambda function digital-shadows-collector .
    • Name: digital-shadows-collector-1h .
  3. Click Create schedule.

(Optional) Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader .
    • Access type: Select Access key – Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. JSON:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::digital-shadows-logs/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::digital-shadows-logs" 
      
     } 
      
     ] 
     } 
     
    
  7. Name = secops-reader-policy .

  8. Click Create policy > search/select > Next > Add permissions.

  9. Create an access key for secops-reader : Security credentials > Access keys.

  10. Click Create access key.

  11. Download the .CSV . (You'll paste these values into the feed).

Configure a feed in Google SecOps to ingest Digital Shadows SearchLight logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, Digital Shadows SearchLight logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select Digital Shadows SearchLightas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://digital-shadows-logs/digital-shadows-searchlight/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: