Collect URLScan IO logs

Supported in:

This document explains how to ingest URLScan IO logs to Google Security Operations using Amazon S3.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance
  • Privileged access to URLScan IOtenant
  • Privileged access to AWS(S3, IAM, Lambda, EventBridge)

Get URLScan IO prerequisites

  1. Sign in to URLScan IO .
  2. Click your profile icon.
  3. Select API Keyfrom the menu.
  4. If you don't have an API key yet:
    • Click Create API Keybutton.
    • Enter a descriptionfor the API key (for example, Google SecOps Integration ).
    • Select the permissionsfor the key (for read-only access, select Readpermissions).
    • Click Generate API Key.
  5. Copy and save in a secure location the following details:
    • API_KEY: The generated API key string (format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx )
    • API Base URL: https://urlscan.io/api/v1 (this is constant for all users)
  6. Note your API quota limits:
    • Free accounts: Limited to 1000 API calls per day, 60 per minute
    • Pro accounts: Higher limits based on subscription tier
  7. If you need to restrict searches to your organization's scans only, note down:
    • User identifier: Your username or email (for use with user: search filter)
    • Team identifier: If using teams feature (for use with team: search filter)

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket .
  2. Save bucket Nameand Regionfor future reference (for example, urlscan-logs-bucket ).
  3. Create a Userfollowing this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select Security credentialstab.
  6. Click Create Access Keyin section Access Keys.
  7. Select Third-party serviceas Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select Permissionstab.
  14. Click Add permissionsin section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies.
  2. Click Create policy > JSON tab.
  3. Enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::urlscan-logs-bucket/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::urlscan-logs-bucket/urlscan/state.json" 
      
     } 
      
     ] 
     } 
     
    
    • Replace urlscan-logs-bucket if you entered a different bucket name.
  4. Click Next > Create policy.

  5. Go to IAM > Roles > Create role > AWS service > Lambda.

  6. Attach the newly created policy.

  7. Name the role urlscan-lambda-role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name urlscan-collector
    Runtime Python 3.13
    Architecture x86_64
    Execution role urlscan-lambda-role
  4. After the function is created, open the Codetab, delete the stub and enter the following code ( urlscan-collector.py ):

      import 
      
     json 
     import 
      
     os 
     import 
      
     boto3 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timedelta 
     import 
      
     urllib3 
     import 
      
     base64 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     's3' 
     ) 
     http 
     = 
     urllib3 
     . 
     PoolManager 
     () 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
     # Environment variables 
     bucket 
     = 
     os 
     . 
     environ 
     [ 
     'S3_BUCKET' 
     ] 
     prefix 
     = 
     os 
     . 
     environ 
     [ 
     'S3_PREFIX' 
     ] 
     state_key 
     = 
     os 
     . 
     environ 
     [ 
     'STATE_KEY' 
     ] 
     api_key 
     = 
     os 
     . 
     environ 
     [ 
     'API_KEY' 
     ] 
     api_base 
     = 
     os 
     . 
     environ 
     [ 
     'API_BASE' 
     ] 
     search_query 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'SEARCH_QUERY' 
     , 
     'date:>now-1h' 
     ) 
     page_size 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'PAGE_SIZE' 
     , 
     '100' 
     )) 
     max_pages 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'MAX_PAGES' 
     , 
     '10' 
     )) 
     # Load state 
     state 
     = 
     load_state 
     ( 
     bucket 
     , 
     state_key 
     ) 
     last_run 
     = 
     state 
     . 
     get 
     ( 
     'last_run' 
     ) 
     # Prepare search query 
     if 
     last_run 
     : 
     # Adjust search query based on last run 
     search_time 
     = 
     datetime 
     . 
     fromisoformat 
     ( 
     last_run 
     ) 
     time_diff 
     = 
     datetime 
     . 
     utcnow 
     () 
     - 
     search_time 
     hours 
     = 
     int 
     ( 
     time_diff 
     . 
     total_seconds 
     () 
     / 
     3600 
     ) 
     + 
     1 
     search_query 
     = 
     f 
     'date:>now- 
     { 
     hours 
     } 
     h' 
     # Search for scans 
     headers 
     = 
     { 
     'API-Key' 
     : 
     api_key 
     } 
     all_results 
     = 
     [] 
     for 
     page 
     in 
     range 
     ( 
     max_pages 
     ): 
     search_url 
     = 
     f 
     " 
     { 
     api_base 
     } 
     /search/" 
     params 
     = 
     { 
     'q' 
     : 
     search_query 
     , 
     'size' 
     : 
     page_size 
     , 
     'offset' 
     : 
     page 
     * 
     page_size 
     } 
     # Make search request 
     response 
     = 
     http 
     . 
     request 
     ( 
     'GET' 
     , 
     search_url 
     , 
     fields 
     = 
     params 
     , 
     headers 
     = 
     headers 
     ) 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     print 
     ( 
     f 
     "Search failed: 
     { 
     response 
     . 
     status 
     } 
     " 
     ) 
     break 
     search_data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     results 
     = 
     search_data 
     . 
     get 
     ( 
     'results' 
     , 
     []) 
     if 
     not 
     results 
     : 
     break 
     # Fetch full result for each scan 
     for 
     result 
     in 
     results 
     : 
     uuid 
     = 
     result 
     . 
     get 
     ( 
     'task' 
     , 
     {}) 
     . 
     get 
     ( 
     'uuid' 
     ) 
     if 
     uuid 
     : 
     result_url 
     = 
     f 
     " 
     { 
     api_base 
     } 
     /result/ 
     { 
     uuid 
     } 
     /" 
     result_response 
     = 
     http 
     . 
     request 
     ( 
     'GET' 
     , 
     result_url 
     , 
     headers 
     = 
     headers 
     ) 
     if 
     result_response 
     . 
     status 
     == 
     200 
     : 
     full_result 
     = 
     json 
     . 
     loads 
     ( 
     result_response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     all_results 
     . 
     append 
     ( 
     full_result 
     ) 
     else 
     : 
     print 
     ( 
     f 
     "Failed to fetch result for 
     { 
     uuid 
     } 
     : 
     { 
     result_response 
     . 
     status 
     } 
     " 
     ) 
     # Check if we have more pages 
     if 
     len 
     ( 
     results 
     ) 
    < page_size 
     : 
     break 
     # Write results to S3 
     if 
     all_results 
     : 
     now 
     = 
     datetime 
     . 
     utcnow 
     () 
     file_key 
     = 
     f 
     " 
     { 
     prefix 
     } 
     year= 
     { 
     now 
     . 
     year 
     } 
     /month= 
     { 
     now 
     . 
     month 
     : 
     02d 
     } 
     /day= 
     { 
     now 
     . 
     day 
     : 
     02d 
     } 
     /hour= 
     { 
     now 
     . 
     hour 
     : 
     02d 
     } 
     /urlscan_ 
     { 
     now 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     } 
     .json" 
     # Create NDJSON content 
     ndjson_content 
     = 
     ' 
     \n 
     ' 
     . 
     join 
     ([ 
     json 
     . 
     dumps 
     ( 
     r 
     , 
     separators 
     = 
     ( 
     ',' 
     , 
     ':' 
     )) 
     for 
     r 
     in 
     all_results 
     ]) 
     # Upload to S3 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     file_key 
     , 
     Body 
     = 
     ndjson_content 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     ContentType 
     = 
     'application/x-ndjson' 
     ) 
     print 
     ( 
     f 
     "Uploaded 
     { 
     len 
     ( 
     all_results 
     ) 
     } 
     results to s3:// 
     { 
     bucket 
     } 
     / 
     { 
     file_key 
     } 
     " 
     ) 
     # Update state 
     state 
     [ 
     'last_run' 
     ] 
     = 
     datetime 
     . 
     utcnow 
     () 
     . 
     isoformat 
     () 
     save_state 
     ( 
     bucket 
     , 
     state_key 
     , 
     state 
     ) 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ({ 
     'message' 
     : 
     f 
     'Processed 
     { 
     len 
     ( 
     all_results 
     ) 
     } 
     scan results' 
     , 
     'location' 
     : 
     f 
     "s3:// 
     { 
     bucket 
     } 
     / 
     { 
     prefix 
     } 
     " 
     }) 
     } 
     def 
      
     load_state 
     ( 
     bucket 
     , 
     key 
     ): 
     try 
     : 
     response 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     ) 
     return 
     json 
     . 
     loads 
     ( 
     response 
     [ 
     'Body' 
     ] 
     . 
     read 
     ()) 
     except 
     s3 
     . 
     exceptions 
     . 
     NoSuchKey 
     : 
     return 
     {} 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error loading state: 
     { 
     e 
     } 
     " 
     ) 
     return 
     {} 
     def 
      
     save_state 
     ( 
     bucket 
     , 
     key 
     , 
     state 
     ): 
     try 
     : 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state 
     ), 
     ContentType 
     = 
     'application/json' 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Error saving state: 
     { 
     e 
     } 
     " 
     ) 
     
    
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the following environment variables, replacing with your values:

    Key Example value
    S3_BUCKET urlscan-logs-bucket
    S3_PREFIX urlscan/
    STATE_KEY urlscan/state.json
    API_KEY <your-api-key>
    API_BASE https://urlscan.io/api/v1
    SEARCH_QUERY date:>now-1h
    PAGE_SIZE 100
    MAX_PAGES 10
  8. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  9. Select the Configurationtab.

  10. In the General configurationpanel click Edit.

  11. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: your Lambda function urlscan-collector .
    • Name: urlscan-collector-1h .
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader .
    • Access type: Select Access key – Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::urlscan-logs-bucket/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::urlscan-logs-bucket" 
      
     } 
      
     ] 
     } 
     
    
  7. Set the name to secops-reader-policy .

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV(these values are entered into the feed).

Configure a feed in Google SecOps to ingest URLScan IO logs

  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, URLScan IO logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select URLScan IOas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://urlscan-logs-bucket/urlscan/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: