Collect Digital Shadows Indicators logs

Supported in:

This document explains how to ingest Digital Shadows Indicators logs into Google Security Operations using Amazon S3.

Digital Shadows Indicators (now part of ReliaQuest GreyMatter DRP) is a digital risk protection platform that continuously monitors and detects external threats, data exposures, and brand impersonation across the open web, deep web, dark web, and social media. It provides threat intelligence, incident alerts, and indicators of compromise (IOCs) to help organizations identify and mitigate digital risks.

Before you begin

  • A Google SecOps instance
  • Privileged access to Digital Shadows Indicatorsportal
  • Privileged access to AWS(S3, IAM)
  • Active subscription to Digital Shadows Indicators with API access enabled

Collect Digital Shadows Indicators API credentials

  1. Sign in to the Digital Shadows Indicators Portalat https://portal-digitalshadows.com .
  2. Go to Settings > API Credentials.
  3. If you don't have an existing API key, click Create New API Clientor Generate API Key.
  4. Copy and save the following details in a secure location:

    • API Key: Your 6-character API key
    • API Secret: Your 32-character API secret
    • Account ID: Your account ID (displayed in the portal or provided by your Digital Shadows representative)
    • API Base URL: https://api.searchlight.app/v1 or https://portal-digitalshadows.com/api/v1 (depending on your tenant region)

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create an Amazon S3 bucketby following this user guide: Creating a bucket .
  2. Save bucket Nameand Regionfor future reference (for example, digital-shadows-logs ).
  3. Create a Userby following this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select the Security credentialstab.
  6. Click Create Access Keyin the Access Keyssection.
  7. Select Third-party serviceas the Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download .csv fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select the Permissionstab.
  14. Click Add permissionsin the Permissions policiessection.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSONtab.
  2. Copy and paste the policy below.
  3. Policy JSON(replace digital-shadows-logs if you entered a different bucket name):

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::digital-shadows-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::digital-shadows-logs/digital-shadows/state.json" 
      
     } 
      
     ] 
     } 
     
    
  4. Click Next > Create policy.

  5. Go to IAM > Roles > Create role > AWS service > Lambda.

  6. Attach the newly created policy.

  7. Name the role DigitalShadowsLambdaRole and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name DigitalShadowsCollector
    Runtime Python 3.13
    Architecture x86_64
    Execution role DigitalShadowsLambdaRole
  4. After the function is created, open the Codetab, delete the stub, and paste the code below ( DigitalShadowsCollector.py).

      import 
      
     urllib3 
     import 
      
     json 
     import 
      
     boto3 
     import 
      
     os 
     import 
      
     base64 
     import 
      
     logging 
     import 
      
     time 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timedelta 
     , 
     timezone 
     from 
      
     urllib.parse 
      
     import 
     urlencode 
     logger 
     = 
     logging 
     . 
     getLogger 
     () 
     logger 
     . 
     setLevel 
     ( 
     logging 
     . 
     INFO 
     ) 
     HTTP 
     = 
     urllib3 
     . 
     PoolManager 
     ( 
     retries 
     = 
     False 
     ) 
     storage_client 
     = 
     boto3 
     . 
     client 
     ( 
     's3' 
     ) 
     def 
      
     _basic_auth_header 
     ( 
     key 
     : 
     str 
     , 
     secret 
     : 
     str 
     ) 
     - 
    > str 
     : 
     token 
     = 
     base64 
     . 
     b64encode 
     ( 
     f 
     " 
     { 
     key 
     } 
     : 
     { 
     secret 
     } 
     " 
     . 
     encode 
     ( 
     "utf-8" 
     )) 
     . 
     decode 
     ( 
     "utf-8" 
     ) 
     return 
     f 
     "Basic 
     { 
     token 
     } 
     " 
     def 
      
     _load_state 
     ( 
     bucket 
     , 
     key 
     , 
     default_days 
     = 
     30 
     ) 
     - 
    > str 
     : 
      
     """Return ISO8601 checkpoint (UTC).""" 
     try 
     : 
     response 
     = 
     storage_client 
     . 
     get_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     ) 
     state_data 
     = 
     response 
     [ 
     'Body' 
     ] 
     . 
     read 
     () 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     state 
     = 
     json 
     . 
     loads 
     ( 
     state_data 
     ) 
     ts 
     = 
     state 
     . 
     get 
     ( 
     "last_timestamp" 
     ) 
     if 
     ts 
     : 
     return 
     ts 
     except 
     storage_client 
     . 
     exceptions 
     . 
     NoSuchKey 
     : 
     logger 
     . 
     info 
     ( 
     "No previous state found, starting from default lookback" 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     warning 
     ( 
     f 
     "State read error: 
     { 
     e 
     } 
     " 
     ) 
     return 
     ( 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     - 
     timedelta 
     ( 
     days 
     = 
     default_days 
     )) 
     . 
     isoformat 
     () 
     def 
      
     _save_state 
     ( 
     bucket 
     , 
     key 
     , 
     ts 
     : 
     str 
     ) 
     - 
    > None 
     : 
     storage_client 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ({ 
     "last_timestamp" 
     : 
     ts 
     }), 
     ContentType 
     = 
     "application/json" 
     ) 
     def 
      
     _get_json 
     ( 
     url 
     : 
     str 
     , 
     headers 
     : 
     dict 
     , 
     params 
     : 
     dict 
     , 
     backoff_s 
     = 
     2 
     , 
     max_retries 
     = 
     3 
     ) 
     - 
    > dict 
     : 
     qs 
     = 
     f 
     "? 
     { 
     urlencode 
     ( 
     params 
     ) 
     } 
     " 
     if 
     params 
     else 
     "" 
     for 
     attempt 
     in 
     range 
     ( 
     max_retries 
     ): 
     r 
     = 
     HTTP 
     . 
     request 
     ( 
     "GET" 
     , 
     f 
     " 
     { 
     url 
     }{ 
     qs 
     } 
     " 
     , 
     headers 
     = 
     headers 
     ) 
     if 
     r 
     . 
     status 
     == 
     200 
     : 
     return 
     json 
     . 
     loads 
     ( 
     r 
     . 
     data 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     if 
     r 
     . 
     status 
     in 
     ( 
     429 
     , 
     500 
     , 
     502 
     , 
     503 
     , 
     504 
     ): 
     wait 
     = 
     backoff_s 
     * 
     ( 
     2 
     ** 
     attempt 
     ) 
     logger 
     . 
     warning 
     ( 
     f 
     "HTTP 
     { 
     r 
     . 
     status 
     } 
     from DS API, retrying in 
     { 
     wait 
     } 
     s" 
     ) 
     time 
     . 
     sleep 
     ( 
     wait 
     ) 
     continue 
     raise 
     RuntimeError 
     ( 
     f 
     "DS API error 
     { 
     r 
     . 
     status 
     } 
     : 
     { 
     r 
     . 
     data 
     [: 
     200 
     ] 
     } 
     " 
     ) 
     raise 
     RuntimeError 
     ( 
     "Exceeded retry budget for DS API" 
     ) 
     def 
      
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     path 
     , 
     since_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     ): 
     items 
     = 
     [] 
     for 
     page 
     in 
     range 
     ( 
     max_pages 
     ): 
     params 
     = 
     { 
     "limit" 
     : 
     page_size 
     , 
     "offset" 
     : 
     page 
     * 
     page_size 
     , 
     time_param 
     : 
     since_ts 
     , 
     } 
     if 
     account_id 
     : 
     params 
     [ 
     "account-id" 
     ] 
     = 
     account_id 
     data 
     = 
     _get_json 
     ( 
     f 
     " 
     { 
     api_base 
     } 
     / 
     { 
     path 
     } 
     " 
     , 
     headers 
     , 
     params 
     ) 
     batch 
     = 
     data 
     . 
     get 
     ( 
     "items" 
     ) 
     or 
     data 
     . 
     get 
     ( 
     "data" 
     ) 
     or 
     [] 
     if 
     not 
     batch 
     : 
     break 
     items 
     . 
     extend 
     ( 
     batch 
     ) 
     if 
     len 
     ( 
     batch 
     ) 
    < page_size 
     : 
     break 
     return 
     items 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
     bucket_name 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     api_key 
     = 
     os 
     . 
     environ 
     [ 
     "DS_API_KEY" 
     ] 
     api_secret 
     = 
     os 
     . 
     environ 
     [ 
     "DS_API_SECRET" 
     ] 
     prefix 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "digital-shadows" 
     ) 
     state_key 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     , 
     "digital-shadows/state.json" 
     ) 
     api_base 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "API_BASE" 
     , 
     "https://api.searchlight.app/v1" 
     ) 
     account_id 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "DS_ACCOUNT_ID" 
     , 
     "" 
     ) 
     page_size 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "PAGE_SIZE" 
     , 
     "100" 
     )) 
     max_pages 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "MAX_PAGES" 
     , 
     "10" 
     )) 
     try 
     : 
     last_ts 
     = 
     _load_state 
     ( 
     bucket_name 
     , 
     state_key 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Checkpoint: 
     { 
     last_ts 
     } 
     " 
     ) 
     headers 
     = 
     { 
     "Authorization" 
     : 
     _basic_auth_header 
     ( 
     api_key 
     , 
     api_secret 
     ), 
     "Accept" 
     : 
     "application/json" 
     , 
     "User-Agent" 
     : 
     "Chronicle-DigitalShadows-S3/1.0" 
     , 
     } 
     records 
     = 
     [] 
     incidents 
     = 
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     "incidents" 
     , 
     last_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     = 
     "published-after" 
     ) 
     for 
     incident 
     in 
     incidents 
     : 
     incident 
     [ 
     '_source_type' 
     ] 
     = 
     'incident' 
     records 
     . 
     extend 
     ( 
     incidents 
     ) 
     intel_incidents 
     = 
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     "intel-incidents" 
     , 
     last_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     = 
     "published-after" 
     ) 
     for 
     intel 
     in 
     intel_incidents 
     : 
     intel 
     [ 
     '_source_type' 
     ] 
     = 
     'intelligence_incident' 
     records 
     . 
     extend 
     ( 
     intel_incidents 
     ) 
     indicators 
     = 
     _collect 
     ( 
     api_base 
     , 
     headers 
     , 
     "indicators" 
     , 
     last_ts 
     , 
     account_id 
     , 
     page_size 
     , 
     max_pages 
     , 
     time_param 
     = 
     "lastUpdated-after" 
     ) 
     for 
     indicator 
     in 
     indicators 
     : 
     indicator 
     [ 
     '_source_type' 
     ] 
     = 
     'ioc' 
     records 
     . 
     extend 
     ( 
     indicators 
     ) 
     if 
     records 
     : 
     newest 
     = 
     max 
     ( 
     ( 
     r 
     . 
     get 
     ( 
     "updated" 
     ) 
     or 
     r 
     . 
     get 
     ( 
     "raised" 
     ) 
     or 
     r 
     . 
     get 
     ( 
     "lastUpdated" 
     ) 
     or 
     last_ts 
     ) 
     for 
     r 
     in 
     records 
     ) 
     key 
     = 
     f 
     " 
     { 
     prefix 
     } 
     /digital_shadows_ 
     { 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     } 
     .json" 
     body 
     = 
     " 
     \n 
     " 
     . 
     join 
     ( 
     json 
     . 
     dumps 
     ( 
     r 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     for 
     r 
     in 
     records 
     ) 
     storage_client 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket_name 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     body 
     , 
     ContentType 
     = 
     "application/x-ndjson" 
     ) 
     _save_state 
     ( 
     bucket_name 
     , 
     state_key 
     , 
     newest 
     ) 
     msg 
     = 
     f 
     "Wrote 
     { 
     len 
     ( 
     records 
     ) 
     } 
     records to s3:// 
     { 
     bucket_name 
     } 
     / 
     { 
     key 
     } 
     " 
     else 
     : 
     msg 
     = 
     "No new records" 
     logger 
     . 
     info 
     ( 
     msg 
     ) 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ({ 
     'message' 
     : 
     msg 
     , 
     'records' 
     : 
     len 
     ( 
     records 
     ) 
     if 
     records 
     else 
     0 
     })} 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     error 
     ( 
     f 
     "Error processing logs: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     " 
     ) 
     raise 
     
    
  5. Go to Configuration > Environment variables > Edit > Add new environment variable.

  6. Enter the environment variables provided below, replacing with your values.

    Environment variables

    Key Example value
    S3_BUCKET digital-shadows-logs
    S3_PREFIX digital-shadows/
    STATE_KEY digital-shadows/state.json
    DS_API_KEY ABC123 (your 6-character API key)
    DS_API_SECRET your-32-character-api-secret
    API_BASE https://api.searchlight.app/v1
    DS_ACCOUNT_ID your-account-id
    PAGE_SIZE 100
    MAX_PAGES 10
  7. After the function is created, stay on its page (or open Lambda > Functions > DigitalShadowsCollector).

  8. Select the Configurationtab.

  9. In the General configurationpanel click Edit.

  10. Change Timeoutto 5 minutes (300 seconds), and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour )
    • Target: Your Lambda function DigitalShadowsCollector
    • Name: DigitalShadowsCollector-1h
  3. Click Create schedule.

Configure a feed in Google SecOps to ingest Digital Shadows Indicators logs

  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. On the next page, click Configure a single feed.
  4. Enter a unique name for the Feed name.
  5. Select Amazon S3 V2as the Source type.
  6. Select Digital Shadows Indicatorsas the Log type.
  7. Click Nextand then click Submit.
  8. Specify values for the following fields:

    • S3 URI: s3://digital-shadows-logs/digital-shadows/
    • Source deletion option: Select the deletion option according to your preference
    • Maximum File Age: Include files modified in the last number of days (default is 180 days)
    • Access Key ID: User access key with access to the S3 bucket
    • Secret Access Key: User secret key with access to the S3 bucket
    • Asset namespace: The asset namespace
    • Ingestion labels: The label to be applied to the events from this feed
  9. Click Nextand then click Submit.

UDM mapping table

Log Field UDM Mapping Logic
value
entity.entity.file.md5 Set if type == "MD5"
value
entity.entity.file.sha1 Set if type == "SHA1"
value
entity.entity.file.sha256 Set if type == "SHA256"
value
entity.entity.hostname Set if type == "HOST"
value
entity.entity.ip Value copied directly if type == "IP"
value
entity.entity.url Set if type == "URL"
value
entity.entity.user.email_addresses Value copied directly if type == "EMAIL"
type
entity.metadata.entity_type Set to "DOMAIN_NAME" if type == "HOST", "IP_ADDRESS" if type == "IP", "URL" if type == "URL", "USER" if type == "EMAIL", "FILE" if type in ["SHA1","SHA256","MD5"], else "UNKNOWN_ENTITYTYPE"
lastUpdated
entity.metadata.interval.start_time Converted from ISO8601 to timestamp if not empty
id
entity.metadata.product_entity_id Value copied directly if not empty
attributionTag.id, attributionTag.name, attributionTag.type
entity.metadata.threat.about.labels Merged with objects {key: tag field name, value: tag value} if not empty
sourceType
entity.metadata.threat.category_details Value copied directly
entity.metadata.threat.threat_feed_name Set to "Indicators"
id
entity.metadata.threat.threat_id Value copied directly if not empty
sourceIdentifier
entity.metadata.threat.url_back_to_product Value copied directly
entity.metadata.product_name Set to "Indicators"
entity.metadata.vendor_name Set to "Digital Shadows"

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: