Collect ZeroFox Platform logs

Supported in:

This document explains how to ingest ZeroFox Platform logs to Google Security Operations using Amazon S3.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance.
  • Privileged access to ZeroFox Platformtenant.
  • Privileged access to AWS(S3, Identity and Access Management (IAM), Lambda, EventBridge).

Get ZeroFox prerequisites

  1. Sign in to the ZeroFox Platformat https://cloud.zerofox.com .
  2. Go to Data Connectors > API Data Feeds.
    • Direct URL (after login): https://cloud.zerofox.com/data_connectors/api
    • If you don't see this menu item, contact your ZeroFox administrator for access.
  3. Click Generate Tokenor Create Personal Access Token.
  4. Provide the following configuration details:
    • Name: Enter a descriptive name (for example, Google SecOps S3 Ingestion ).
    • Expiration: Select a rotation period according to your organization's security policy.
    • Permissions/Feeds: Select read permissions for: Alerts , CTI feeds , and other data types you want to export
  5. Click Generate.
  6. Copy and save the generated Personal Access Token in a secure location (you won't be able to view it again).
  7. Save the ZEROFOX_BASE_URL: https://api.zerofox.com (default for most tenants)

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, zerofox-platform-logs ).
  3. Create a Userfollowing this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select Security credentialstab.
  6. Click Create Access Keyin section Access Keys.
  7. Select Third-party serviceas Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download .CSV fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select Permissionstab.
  14. Click Add permissionsin section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies,
  2. Click Create policy > JSON tab.
  3. Copy and paste the following policy.
  4. Policy JSON(replace zerofox-platform-logs if you entered a different bucket name):

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::zerofox-platform-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::zerofox-platform-logs/zerofox/platform/state.json" 
      
     } 
      
     ] 
     } 
     
    
  5. Click Next > Create policy.

  6. Go to IAM > Roles > Create role > AWS service > Lambda.

  7. Attach the newly created policy.

  8. Name the role ZeroFoxPlatformToS3Role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name zerofox_platform_to_s3
    Runtime Python 3.13
    Architecture x86_64
    Execution role ZeroFoxPlatformToS3Role
  4. After the function is created, open the Codetab, delete the stub and paste the following code ( zerofox_platform_to_s3.py ).

      #!/usr/bin/env python3 
     # Lambda: Pull ZeroFox Platform data (alerts/incidents/logs) to S3 (no transform) 
     import 
      
     os 
     , 
      
     json 
     , 
      
     time 
     , 
      
     urllib.parse 
     from 
      
     urllib.request 
      
     import 
     Request 
     , 
     urlopen 
     from 
      
     urllib.error 
      
     import 
     HTTPError 
     , 
     URLError 
     import 
      
     boto3 
     S3_BUCKET 
     = 
     os 
     . 
     environ 
     [ 
     "S3_BUCKET" 
     ] 
     S3_PREFIX 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "S3_PREFIX" 
     , 
     "zerofox/platform/" 
     ) 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "STATE_KEY" 
     , 
     "zerofox/platform/state.json" 
     ) 
     LOOKBACK_SEC 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "LOOKBACK_SECONDS" 
     , 
     "3600" 
     )) 
     PAGE_SIZE 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "PAGE_SIZE" 
     , 
     "200" 
     )) 
     MAX_PAGES 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "MAX_PAGES" 
     , 
     "20" 
     )) 
     HTTP_TIMEOUT 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "HTTP_TIMEOUT" 
     , 
     "60" 
     )) 
     HTTP_RETRIES 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "HTTP_RETRIES" 
     , 
     "3" 
     )) 
     URL_TEMPLATE 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "URL_TEMPLATE" 
     , 
     "" 
     ) 
     AUTH_HEADER 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "AUTH_HEADER" 
     , 
     "" 
     ) 
     # e.g. "Authorization: Bearer <token>" 
     ZEROFOX_BASE_URL 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "ZEROFOX_BASE_URL" 
     , 
     "https://api.zerofox.com" 
     ) 
     ZEROFOX_API_TOKEN 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     "ZEROFOX_API_TOKEN" 
     , 
     "" 
     ) 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     "s3" 
     ) 
     def 
      
     _iso 
     ( 
     ts 
     : 
     float 
     ) 
     - 
    > str 
     : 
     return 
     time 
     . 
     strftime 
     ( 
     "%Y-%m- 
     %d 
     T%H:%M:%SZ" 
     , 
     time 
     . 
     gmtime 
     ( 
     ts 
     )) 
     def 
      
     _load_state 
     () 
     - 
    > dict 
     : 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     ) 
     b 
     = 
     obj 
     [ 
     "Body" 
     ] 
     . 
     read 
     () 
     return 
     json 
     . 
     loads 
     ( 
     b 
     ) 
     if 
     b 
     else 
     {} 
     except 
     Exception 
     : 
     return 
     { 
     "last_since" 
     : 
     _iso 
     ( 
     time 
     . 
     time 
     () 
     - 
     LOOKBACK_SEC 
     )} 
     def 
      
     _save_state 
     ( 
     st 
     : 
     dict 
     ) 
     - 
    > None 
     : 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     st 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     , 
     ) 
     def 
      
     _headers 
     () 
     - 
    > dict 
     : 
     hdrs 
     = 
     { 
     "Accept" 
     : 
     "application/json" 
     , 
     "Content-Type" 
     : 
     "application/json" 
     } 
     if 
     AUTH_HEADER 
     : 
     try 
     : 
     k 
     , 
     v 
     = 
     AUTH_HEADER 
     . 
     split 
     ( 
     ":" 
     , 
     1 
     ) 
     hdrs 
     [ 
     k 
     . 
     strip 
     ()] 
     = 
     v 
     . 
     strip 
     () 
     except 
     ValueError 
     : 
     hdrs 
     [ 
     "Authorization" 
     ] 
     = 
     AUTH_HEADER 
     . 
     strip 
     () 
     elif 
     ZEROFOX_API_TOKEN 
     : 
     hdrs 
     [ 
     "Authorization" 
     ] 
     = 
     f 
     "Bearer 
     { 
     ZEROFOX_API_TOKEN 
     } 
     " 
     return 
     hdrs 
     def 
      
     _http_get 
     ( 
     url 
     : 
     str 
     ) 
     - 
    > dict 
     : 
     attempt 
     = 
     0 
     while 
     True 
     : 
     try 
     : 
     req 
     = 
     Request 
     ( 
     url 
     , 
     method 
     = 
     "GET" 
     ) 
     for 
     k 
     , 
     v 
     in 
     _headers 
     () 
     . 
     items 
     (): 
     req 
     . 
     add_header 
     ( 
     k 
     , 
     v 
     ) 
     with 
     urlopen 
     ( 
     req 
     , 
     timeout 
     = 
     HTTP_TIMEOUT 
     ) 
     as 
     r 
     : 
     body 
     = 
     r 
     . 
     read 
     () 
     try 
     : 
     return 
     json 
     . 
     loads 
     ( 
     body 
     . 
     decode 
     ( 
     "utf-8" 
     )) 
     except 
     json 
     . 
     JSONDecodeError 
     : 
     return 
     { 
     "raw" 
     : 
     body 
     . 
     decode 
     ( 
     "utf-8" 
     , 
     errors 
     = 
     "replace" 
     )} 
     except 
     HTTPError 
     as 
     e 
     : 
     if 
     e 
     . 
     code 
     in 
     ( 
     429 
     , 
     500 
     , 
     502 
     , 
     503 
     , 
     504 
     ) 
     and 
     attempt 
    < HTTP_RETRIES 
     : 
     retry_after 
     = 
     int 
     ( 
     e 
     . 
     headers 
     . 
     get 
     ( 
     "Retry-After" 
     , 
     1 
     + 
     attempt 
     )) 
     time 
     . 
     sleep 
     ( 
     max 
     ( 
     1 
     , 
     retry_after 
     )) 
     attempt 
     += 
     1 
     continue 
     raise 
     except 
     URLError 
     : 
     if 
     attempt 
    < HTTP_RETRIES 
     : 
     time 
     . 
     sleep 
     ( 
     1 
     + 
     attempt 
     ) 
     attempt 
     += 
     1 
     continue 
     raise 
     def 
      
     _put_json 
     ( 
     obj 
     : 
     dict 
     , 
     label 
     : 
     str 
     ) 
     - 
    > str 
     : 
     ts 
     = 
     time 
     . 
     gmtime 
     () 
     key 
     = 
     f 
     " 
     { 
     S3_PREFIX 
     } 
     / 
     { 
     time 
     . 
     strftime 
     ( 
     '%Y/%m/ 
     %d 
     /%H%M%S' 
     , 
      
     ts 
     ) 
     } 
     -zerofox- 
     { 
     label 
     } 
     .json" 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     S3_BUCKET 
     , 
     Key 
     = 
     key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     obj 
     , 
     separators 
     = 
     ( 
     "," 
     , 
     ":" 
     )) 
     . 
     encode 
     ( 
     "utf-8" 
     ), 
     ContentType 
     = 
     "application/json" 
     , 
     ) 
     return 
     key 
     def 
      
     _extract_next_token 
     ( 
     payload 
     : 
     dict 
     ): 
     next_token 
     = 
     ( 
     payload 
     . 
     get 
     ( 
     "next" 
     ) 
     or 
     payload 
     . 
     get 
     ( 
     "next_token" 
     ) 
     or 
     payload 
     . 
     get 
     ( 
     "nextPageToken" 
     ) 
     or 
     payload 
     . 
     get 
     ( 
     "next_page_token" 
     )) 
     if 
     isinstance 
     ( 
     next_token 
     , 
     dict 
     ): 
     return 
     next_token 
     . 
     get 
     ( 
     "token" 
     ) 
     or 
     next_token 
     . 
     get 
     ( 
     "cursor" 
     ) 
     or 
     next_token 
     . 
     get 
     ( 
     "value" 
     ) 
     return 
     next_token 
     def 
      
     _extract_items 
     ( 
     payload 
     : 
     dict 
     ) 
     - 
    > list 
     : 
     for 
     key 
     in 
     ( 
     "results" 
     , 
     "data" 
     , 
     "alerts" 
     , 
     "items" 
     , 
     "logs" 
     , 
     "events" 
     ): 
     if 
     isinstance 
     ( 
     payload 
     . 
     get 
     ( 
     key 
     ), 
     list 
     ): 
     return 
     payload 
     [ 
     key 
     ] 
     return 
     [] 
     def 
      
     _extract_newest_timestamp 
     ( 
     items 
     : 
     list 
     , 
     current 
     : 
     str 
     ) 
     - 
    > str 
     : 
     newest 
     = 
     current 
     for 
     item 
     in 
     items 
     : 
     timestamp 
     = 
     ( 
     item 
     . 
     get 
     ( 
     "timestamp" 
     ) 
     or 
     item 
     . 
     get 
     ( 
     "created_at" 
     ) 
     or 
     item 
     . 
     get 
     ( 
     "last_modified" 
     ) 
     or 
     item 
     . 
     get 
     ( 
     "event_time" 
     ) 
     or 
     item 
     . 
     get 
     ( 
     "log_time" 
     ) 
     or 
     item 
     . 
     get 
     ( 
     "updated_at" 
     )) 
     if 
     isinstance 
     ( 
     timestamp 
     , 
     str 
     ) 
     and 
     timestamp 
    > newest 
     : 
     newest 
     = 
     timestamp 
     return 
     newest 
     def 
      
     lambda_handler 
     ( 
     event 
     = 
     None 
     , 
     context 
     = 
     None 
     ): 
     st 
     = 
     _load_state 
     () 
     since 
     = 
     st 
     . 
     get 
     ( 
     "last_since" 
     ) 
     or 
     _iso 
     ( 
     time 
     . 
     time 
     () 
     - 
     LOOKBACK_SEC 
     ) 
     # Use URL_TEMPLATE if provided, otherwise construct default alerts endpoint 
     if 
     URL_TEMPLATE 
     : 
     base_url 
     = 
     URL_TEMPLATE 
     . 
     replace 
     ( 
     " 
     {SINCE} 
     " 
     , 
     urllib 
     . 
     parse 
     . 
     quote 
     ( 
     since 
     )) 
     else 
     : 
     base_url 
     = 
     f 
     " 
     { 
     ZEROFOX_BASE_URL 
     } 
     /v1/alerts?since= 
     { 
     urllib 
     . 
     parse 
     . 
     quote 
     ( 
     since 
     ) 
     } 
     " 
     page_token 
     = 
     "" 
     pages 
     = 
     0 
     total_items 
     = 
     0 
     newest_since 
     = 
     since 
     while 
     pages 
    < MAX_PAGES 
     : 
     # Construct URL with pagination 
     if 
     URL_TEMPLATE 
     : 
     url 
     = 
     ( 
     base_url 
     . 
     replace 
     ( 
     " 
     {PAGE_TOKEN} 
     " 
     , 
     urllib 
     . 
     parse 
     . 
     quote 
     ( 
     page_token 
     )) 
     . 
     replace 
     ( 
     " 
     {PAGE_SIZE} 
     " 
     , 
     str 
     ( 
     PAGE_SIZE 
     ))) 
     else 
     : 
     url 
     = 
     f 
     " 
     { 
     base_url 
     } 
    & limit= 
     { 
     PAGE_SIZE 
     } 
     " 
     if 
     page_token 
     : 
     url 
     += 
     f 
     "&page_token= 
     { 
     urllib 
     . 
     parse 
     . 
     quote 
     ( 
     page_token 
     ) 
     } 
     " 
     payload 
     = 
     _http_get 
     ( 
     url 
     ) 
     _put_json 
     ( 
     payload 
     , 
     f 
     "page- 
     { 
     pages 
     : 
     05d 
     } 
     " 
     ) 
     items 
     = 
     _extract_items 
     ( 
     payload 
     ) 
     total_items 
     += 
     len 
     ( 
     items 
     ) 
     newest_since 
     = 
     _extract_newest_timestamp 
     ( 
     items 
     , 
     newest_since 
     ) 
     pages 
     += 
     1 
     next_token 
     = 
     _extract_next_token 
     ( 
     payload 
     ) 
     if 
     not 
     next_token 
     : 
     break 
     page_token 
     = 
     str 
     ( 
     next_token 
     ) 
     if 
     newest_since 
     and 
     newest_since 
     != 
     st 
     . 
     get 
     ( 
     "last_since" 
     ): 
     st 
     [ 
     "last_since" 
     ] 
     = 
     newest_since 
     _save_state 
     ( 
     st 
     ) 
     return 
     { 
     "ok" 
     : 
     True 
     , 
     "pages" 
     : 
     pages 
     , 
     "items" 
     : 
     total_items 
     , 
     "since" 
     : 
     since 
     , 
     "new_since" 
     : 
     newest_since 
     } 
     if 
     __name__ 
     == 
     "__main__" 
     : 
     print 
     ( 
     lambda_handler 
     ()) 
     
    
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the environment variables provided in the following table, replacing the example values with your values.

    Environment variables

    Key Example value
    S3_BUCKET zerofox-platform-logs
    S3_PREFIX zerofox/platform/
    STATE_KEY zerofox/platform/state.json
    ZEROFOX_BASE_URL https://api.zerofox.com
    ZEROFOX_API_TOKEN your-zerofox-personal-access-token
    LOOKBACK_SECONDS 3600
    PAGE_SIZE 200
    MAX_PAGES 20
    HTTP_TIMEOUT 60
    HTTP_RETRIES 3
    URL_TEMPLATE (optional) Custom URL template with {SINCE} , {PAGE_TOKEN} , {PAGE_SIZE}
    AUTH_HEADER (optional) Authorization: Bearer <token> for custom auth
  8. After the function is created, stay on its page (or open Lambda > Functions > your-function).

  9. Select the Configurationtab.

  10. In the General configurationpanel, click Edit.

  11. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour ).
    • Target: Your Lambda function zerofox_platform_to_s3 .
    • Name: zerofox-platform-1h .
  3. Click Create schedule.

(Optional) Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader .
    • Access type: Select Access key — Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. JSON:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::zerofox-platform-logs/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::zerofox-platform-logs" 
      
     } 
      
     ] 
     } 
     
    
  7. Name = secops-reader-policy .

  8. Click Create policy > search/select > Next > Add permissions.

  9. Create access key for secops-reader : Security credentials > Access keys.

  10. Click Create access key.

  11. Download the .CSV . (You'll paste these values into the feed).

Configure a feed in Google SecOps to ingest ZeroFox Platform logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, ZeroFox Platform Logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select ZeroFox Platformas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://zerofox-platform-logs/zerofox/platform/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: