Collect Google Cloud Looker audit logs

Supported in:

This document explains how to ingest Google Cloud Looker audit logs to Google Security Operations using Google Cloud Storage or Amazon S3.

Looker is a business intelligence and data analytics platform that enables organizations to explore, analyze, and share real-time business insights. Looker's System Activity model tracks user authentication events, query execution history, dashboard and Look access, content creation and modification, download events, API calls, scheduled delivery events, and permission changes.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance
  • A Looker instance with admin access or the see_system_activity permission
  • Looker API credentials (Client ID and Client Secret)
  • For GCS path: A GCP project with Cloud Storage, Cloud Run, Pub/Sub, and Cloud Scheduler APIs enabled
  • For S3 path: Privileged access to AWS(S3, IAM)

Configure Looker API credentials

To enable Google SecOps to retrieve System Activity audit data, you need to create API credentials in Looker and configure a service account with the required permissions.

  1. Sign in to your Looker instanceas an admin.
  2. Go to Admin > Users.
  3. Click Add Users.
  4. In the Emailfield, enter a service account email (for example, chronicle-integration@yourcompany.com ).
  5. Under Roles, select a role that includes the see_system_activity permission.

  6. Click Save.

Generate API3 credentials

  1. Go to Admin > Users.
  2. Find the service account user you created and click Edit.
  3. Scroll down to the API Keyssection.
  4. Click New API Key.
  5. Copy and save the following values in a secure location:

    • Client ID: The public identifier for API authentication
    • Client Secret: The private key for API authentication

Identify your Looker API base URL

  • Your Looker API base URL follows this format:

     https://<instance_name>.cloud.looker.com 
    

For Looker instances hosted on Google Cloud, Microsoft Azure, and instances hosted on AWS created on or after 07/07/2020, the API uses port 443 (default HTTPS). For older AWS-hosted instances, the API may use port 19999.

You can find the API Host URL by navigating to Admin > APIin your Looker instance.

Test API access

  • Test your credentials before proceeding with the integration:

      # Replace with your actual credentials 
     LOOKER_BASE_URL 
     = 
     "https://your-instance.cloud.looker.com" 
     CLIENT_ID 
     = 
     "your-client-id" 
     CLIENT_SECRET 
     = 
     "your-client-secret" 
     # Obtain access token 
     TOKEN 
     = 
     $( 
    curl  
    -s  
    -X  
    POST  
     " 
     ${ 
     LOOKER_BASE_URL 
     } 
     /api/4.0/login" 
      
     \ 
      
    -d  
     "client_id= 
     ${ 
     CLIENT_ID 
     } 
    & client_secret= 
     ${ 
     CLIENT_SECRET 
     } 
     " 
      
     \ 
      
     | 
      
    python3  
    -c  
     "import sys,json; print(json.load(sys.stdin)['access_token'])" 
     ) 
     # Test System Activity access 
    curl  
    -s  
    -H  
     "Authorization: token 
     ${ 
     TOKEN 
     } 
     " 
      
     \ 
      
     " 
     ${ 
     LOOKER_BASE_URL 
     } 
     /api/4.0/queries/run/json" 
      
     \ 
      
    -X  
    POST  
     \ 
      
    -H  
     "Content-Type: application/json" 
      
     \ 
      
    -d  
     '{"model":"system__activity","view":"event","fields":["event.name","event.created_time"],"limit":"5","sorts":["event.created_time desc"]}' 
      
     \ 
      
     | 
      
    python3  
    -m  
    json.tool 
    

A successful response returns a JSON array of recent Looker events.

Option A: Configure ingestion using Google Cloud Storage

This option uses a Cloud Run function to poll the Looker API for System Activity audit events and write them to a GCS bucket for ingestion by Google SecOps.

Create Google Cloud Storage bucket

  1. Go to the Google Cloud Console .
  2. Select your project or create a new one.
  3. In the navigation menu, go to Cloud Storage > Buckets.
  4. Click Create bucket.
  5. Provide the following configuration details:

    Setting Value
    Name your bucket Enter a globally unique name (for example, looker-audit-logs-gcs )
    Location type Choose based on your needs (Region, Dual-region, Multi-region)
    Location Select the location (for example, us-central1 )
    Storage class Standard (recommended for frequently accessed logs)
    Access control Uniform (recommended)
    Protection tools Optional: Enable object versioning or retention policy
  6. Click Create.

  1. In the GCP Console, go to IAM & Admin > Service Accounts.
  2. Click Create Service Account.
  3. Provide the following configuration details:
    • Service account name: Enter looker-audit-collector-sa
    • Service account description: Enter Service account for Cloud Run function to collect Looker audit logs
  4. Click Create and Continue.
  5. In the Grant this service account access to projectsection, add the following roles:
    1. Click Select a role.
    2. Search for and select Storage Object Admin.
    3. Click + Add another role.
    4. Search for and select Cloud Run Invoker.
    5. Click + Add another role.
    6. Search for and select Cloud Functions Invoker.
  6. Click Continue.
  7. Click Done.

Grant IAM permissions on GCS bucket

  1. Go to Cloud Storage > Buckets.
  2. Click on your bucket name ( looker-audit-logs-gcs ).
  3. Go to the Permissionstab.
  4. Click Grant access.
  5. Provide the following configuration details:
    • Add principals: Enter the service account email ( looker-audit-collector-sa@PROJECT_ID.iam.gserviceaccount.com )
    • Assign roles: Select Storage Object Admin
  6. Click Save.

Create Pub/Sub topic

  1. In the GCP Console, go to Pub/Sub > Topics.
  2. Click Create topic.
  3. Provide the following configuration details:
    • Topic ID: Enter looker-audit-trigger
    • Leave other settings as default
  4. Click Create.

Create Cloud Run function to collect logs

  1. In the GCP Console, go to Cloud Run.
  2. Click Create service.
  3. Select Function(use an inline editor to create a function).
  4. In the Configuresection, provide the following configuration details:

    Setting Value
    Service name looker-audit-collector
    Region Select region matching your GCS bucket (for example, us-central1 )
    Runtime Select Python 3.12or later
  5. In the Trigger (optional)section:

    1. Click + Add trigger.
    2. Select Cloud Pub/Sub.
    3. In Select a Cloud Pub/Sub topic, choose looker-audit-trigger .
    4. Click Save.
  6. In the Authenticationsection:

    1. Select Require authentication.
    2. Check Identity and Access Management (IAM).
  7. Scroll down and expand Containers, Networking, Security.

  8. Go to the Securitytab:

    • Service account: Select looker-audit-collector-sa
  9. Go to the Containerstab:

    1. Click Variables & Secrets.
    2. Click + Add variablefor each environment variable:
    Variable Name Example Value Description
    GCS_BUCKET
    looker-audit-logs-gcs GCS bucket name
    GCS_PREFIX
    looker-audit Prefix for log files
    STATE_KEY
    looker-audit/state.json State file path
    LOOKER_BASE_URL
    https://your-instance.cloud.looker.com Looker API base URL
    LOOKER_CLIENT_ID
    your-client-id Looker API Client ID
    LOOKER_CLIENT_SECRET
    your-client-secret Looker API Client Secret
    LOOKBACK_HOURS
    24 Initial lookback period
    PAGE_SIZE
    5000 Records per API page
    MAX_PAGES
    20 Max pages per query
  10. In the Variables & Secretssection, scroll down to Requests:

    • Request timeout: Enter 600 seconds (10 minutes)
  11. Go to the Settingstab:

    • In the Resourcessection:
      • Memory: Select 512 MiBor higher
      • CPU: Select 1
  12. In the Revision scalingsection:

    • Minimum number of instances: Enter 0
    • Maximum number of instances: Enter 100
  13. Click Create.

  14. Wait for the service to be created (1-2 minutes).

  15. After the service is created, the inline code editorwill open automatically.

Add function code

  1. Enter mainin the Entry pointfield.
  2. In the inline code editor, create two files:
  • main.py:

      import 
      
     functions_framework 
     from 
      
     google.cloud 
      
     import 
      storage 
     
     import 
      
     json 
     import 
      
     os 
     import 
      
     urllib3 
     import 
      
     urllib.parse 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timezone 
     , 
     timedelta 
     http 
     = 
     urllib3 
     . 
     PoolManager 
     ( 
     timeout 
     = 
     urllib3 
     . 
     Timeout 
     ( 
     connect 
     = 
     10.0 
     , 
     read 
     = 
     60.0 
     ), 
     retries 
     = 
     False 
     , 
     ) 
     storage_client 
     = 
      storage 
     
     . 
      Client 
     
     () 
     GCS_BUCKET 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'GCS_BUCKET' 
     ) 
     GCS_PREFIX 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'GCS_PREFIX' 
     , 
     'looker-audit' 
     ) 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'STATE_KEY' 
     , 
     'looker-audit/state.json' 
     ) 
     LOOKER_BASE_URL 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'LOOKER_BASE_URL' 
     , 
     '' 
     ) 
     . 
     rstrip 
     ( 
     '/' 
     ) 
     CLIENT_ID 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'LOOKER_CLIENT_ID' 
     ) 
     CLIENT_SECRET 
     = 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'LOOKER_CLIENT_SECRET' 
     ) 
     LOOKBACK_HOURS 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'LOOKBACK_HOURS' 
     , 
     '24' 
     )) 
     PAGE_SIZE 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'PAGE_SIZE' 
     , 
     '5000' 
     )) 
     MAX_PAGES 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'MAX_PAGES' 
     , 
     '20' 
     )) 
     @functions_framework 
     . 
     cloud_event 
     def 
      
     main 
     ( 
     cloud_event 
     ): 
     if 
     not 
     all 
     ([ 
     GCS_BUCKET 
     , 
     LOOKER_BASE_URL 
     , 
     CLIENT_ID 
     , 
     CLIENT_SECRET 
     ]): 
     print 
     ( 
     'Error: Missing required environment variables' 
     ) 
     return 
     try 
     : 
     bucket 
     = 
     storage_client 
     . 
      bucket 
     
     ( 
     GCS_BUCKET 
     ) 
     state 
     = 
     load_state 
     ( 
     bucket 
     ) 
     now 
     = 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     if 
     isinstance 
     ( 
     state 
     , 
     dict 
     ) 
     and 
      state 
     
     . 
     get 
     ( 
     'last_event_time' 
     ): 
     try 
     : 
     last_val 
     = 
     state 
     [ 
     'last_event_time' 
     ] 
     if 
     last_val 
     . 
     endswith 
     ( 
     'Z' 
     ): 
     last_val 
     = 
     last_val 
     [: 
     - 
     1 
     ] 
     + 
     '+00:00' 
     last_time 
     = 
     datetime 
     . 
     fromisoformat 
     ( 
     last_val 
     ) 
     last_time 
     = 
     last_time 
     - 
     timedelta 
     ( 
     minutes 
     = 
     2 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Warning: Could not parse last_event_time: 
     { 
     e 
     } 
     " 
     ) 
     last_time 
     = 
     now 
     - 
     timedelta 
     ( 
     hours 
     = 
     LOOKBACK_HOURS 
     ) 
     else 
     : 
     last_time 
     = 
     now 
     - 
     timedelta 
     ( 
     hours 
     = 
     LOOKBACK_HOURS 
     ) 
     print 
     ( 
     f 
     "Fetching events from 
     { 
     last_time 
     . 
     isoformat 
     () 
     } 
     to 
     { 
     now 
     . 
     isoformat 
     () 
     } 
     " 
     ) 
     token 
     = 
     get_access_token 
     () 
     events 
     = 
     fetch_system_activity 
     ( 
     token 
     , 
     'event' 
     , 
     [ 
     'event.id' 
     , 
     'event.name' 
     , 
     'event.category' 
     , 
     'event.created_time' 
     , 
     'event.is_api_call' 
     , 
     'event.is_admin' 
     , 
     'event.is_looker_employee' 
     , 
     'user.id' 
     , 
     'user.name' 
     , 
     'user.email' 
     ], 
     'event.created_time' 
     , 
     last_time 
     , 
     now 
     ) 
     history 
     = 
     fetch_system_activity 
     ( 
     token 
     , 
     'history' 
     , 
     [ 
     'history.id' 
     , 
     'history.created_time' 
     , 
     'history.completed_time' 
     , 
     'history.status' 
     , 
     'history.source' 
     , 
     'history.issuer_source' 
     , 
     'history.runtime' 
     , 
     'history.message' 
     , 
     'query.id' 
     , 
     'query.model' 
     , 
     'query.view' 
     , 
     'user.id' 
     , 
     'user.name' 
     , 
     'user.email' 
     , 
     'dashboard.id' 
     , 
     'dashboard.title' 
     , 
     'look.id' 
     , 
     'look.title' 
     ], 
     'history.created_time' 
     , 
     last_time 
     , 
     now 
     ) 
     all_records 
     = 
     [] 
     for 
     e 
     in 
     events 
     : 
     e 
     [ 
     '_looker_record_type' 
     ] 
     = 
     'event' 
     all_records 
     . 
     append 
     ( 
     e 
     ) 
     for 
     h 
     in 
     history 
     : 
     h 
     [ 
     '_looker_record_type' 
     ] 
     = 
     'history' 
     all_records 
     . 
     append 
     ( 
     h 
     ) 
     if 
     not 
     all_records 
     : 
     print 
     ( 
     "No new records found." 
     ) 
     save_state 
     ( 
     bucket 
     , 
     now 
     . 
     isoformat 
     ()) 
     return 
     timestamp 
     = 
     now 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     object_key 
     = 
     f 
     " 
     { 
     GCS_PREFIX 
     } 
     /looker_audit_ 
     { 
     timestamp 
     } 
     .ndjson" 
     blob 
     = 
     bucket 
     . 
     blob 
     ( 
     object_key 
     ) 
     ndjson 
     = 
     ' 
     \n 
     ' 
     . 
     join 
     ( 
     [ 
     json 
     . 
     dumps 
     ( 
     r 
     , 
     ensure_ascii 
     = 
     False 
     , 
     default 
     = 
     str 
     ) 
     for 
     r 
     in 
     all_records 
     ] 
     ) 
     + 
     ' 
     \n 
     ' 
     blob 
     . 
      upload_from_string 
     
     ( 
     ndjson 
     , 
     content_type 
     = 
     'application/x-ndjson' 
     ) 
     print 
     ( 
     f 
     "Wrote 
     { 
     len 
     ( 
     all_records 
     ) 
     } 
     records to gs:// 
     { 
     GCS_BUCKET 
     } 
     / 
     { 
     object_key 
     } 
     " 
     ) 
     newest 
     = 
     find_newest_time 
     ( 
     events 
     , 
     history 
     ) 
     save_state 
     ( 
     bucket 
     , 
     newest 
     if 
     newest 
     else 
     now 
     . 
     isoformat 
     ()) 
     print 
     ( 
     f 
     "Successfully processed 
     { 
     len 
     ( 
     all_records 
     ) 
     } 
     records " 
     f 
     "(events: 
     { 
     len 
     ( 
     events 
     ) 
     } 
     , history: 
     { 
     len 
     ( 
     history 
     ) 
     } 
     )" 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     'Error processing logs: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     ' 
     ) 
     raise 
     def 
      
     get_access_token 
     (): 
     url 
     = 
     f 
     " 
     { 
     LOOKER_BASE_URL 
     } 
     /api/4.0/login" 
     encoded_body 
     = 
     urllib 
     . 
     parse 
     . 
     urlencode 
     ({ 
     'client_id' 
     : 
     CLIENT_ID 
     , 
     'client_secret' 
     : 
     CLIENT_SECRET 
     }) 
     . 
     encode 
     ( 
     'utf-8' 
     ) 
     response 
     = 
     http 
     . 
     request 
     ( 
     'POST' 
     , 
     url 
     , 
     body 
     = 
     encoded_body 
     , 
     headers 
     = 
     { 
     'Content-Type' 
     : 
     'application/x-www-form-urlencoded' 
     } 
     ) 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     raise 
     Exception 
     ( 
     f 
     "Looker login failed: 
     { 
     response 
     . 
     status 
     } 
     - " 
     f 
     " 
     { 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     } 
     " 
     ) 
     data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     token 
     = 
     data 
     . 
     get 
     ( 
     'access_token' 
     ) 
     if 
     not 
     token 
     : 
     raise 
     Exception 
     ( 
     "No access_token in login response" 
     ) 
     print 
     ( 
     "Successfully obtained Looker API access token" 
     ) 
     return 
     token 
     def 
      
     fetch_system_activity 
     ( 
     token 
     , 
     view 
     , 
     fields 
     , 
     time_field 
     , 
     start_time 
     , 
     end_time 
     ): 
     start_str 
     = 
     start_time 
     . 
     strftime 
     ( 
     '%Y-%m- 
     %d 
     %H:%M:%S' 
     ) 
     end_str 
     = 
     end_time 
     . 
     strftime 
     ( 
     '%Y-%m- 
     %d 
     %H:%M:%S' 
     ) 
     all_records 
     = 
     [] 
     offset 
     = 
     0 
     for 
     page 
     in 
     range 
     ( 
     MAX_PAGES 
     ): 
     query_body 
     = 
     { 
     "model" 
     : 
     "system__activity" 
     , 
     "view" 
     : 
     view 
     , 
     "fields" 
     : 
     fields 
     , 
     "filters" 
     : 
     { 
     time_field 
     : 
     f 
     " 
     { 
     start_str 
     } 
     to 
     { 
     end_str 
     } 
     " 
     }, 
     "sorts" 
     : 
     [ 
     f 
     " 
     { 
     time_field 
     } 
     asc" 
     ], 
     "limit" 
     : 
     str 
     ( 
     PAGE_SIZE 
     ), 
     "offset" 
     : 
     str 
     ( 
     offset 
     ) 
     } 
     url 
     = 
     f 
     " 
     { 
     LOOKER_BASE_URL 
     } 
     /api/4.0/queries/run/json" 
     response 
     = 
     http 
     . 
     request 
     ( 
     'POST' 
     , 
     url 
     , 
     body 
     = 
     json 
     . 
     dumps 
     ( 
     query_body 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     headers 
     = 
     { 
     'Authorization' 
     : 
     f 
     'token 
     { 
     token 
     } 
     ' 
     , 
     'Content-Type' 
     : 
     'application/json' 
     } 
     ) 
     if 
     response 
     . 
     status 
     == 
     429 
     : 
     print 
     ( 
     f 
     "Rate limited on 
     { 
     view 
     } 
     query. Stopping pagination." 
     ) 
     break 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     print 
     ( 
     f 
     " 
     { 
     view 
     } 
     query failed: 
     { 
     response 
     . 
     status 
     } 
     - " 
     f 
     " 
     { 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     } 
     " 
     ) 
     break 
     page_results 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     if 
     not 
     page_results 
     : 
     break 
     all_records 
     . 
     extend 
     ( 
     page_results 
     ) 
     print 
     ( 
     f 
     " 
     { 
     view 
     } 
     page 
     { 
     page 
      
     + 
      
     1 
     } 
     : 
     { 
     len 
     ( 
     page_results 
     ) 
     } 
     records " 
     f 
     "(total: 
     { 
     len 
     ( 
     all_records 
     ) 
     } 
     )" 
     ) 
     if 
     len 
     ( 
     page_results 
     ) 
    < PAGE_SIZE 
     : 
     break 
     offset 
     += 
     PAGE_SIZE 
     print 
     ( 
     f 
     "Total 
     { 
     view 
     } 
     records fetched: 
     { 
     len 
     ( 
     all_records 
     ) 
     } 
     " 
     ) 
     return 
     all_records 
     def 
      
     find_newest_time 
     ( 
     events 
     , 
     history 
     ): 
     newest 
     = 
     None 
     for 
     e 
     in 
     events 
     : 
     t 
     = 
     e 
     . 
     get 
     ( 
     'event.created_time' 
     ) 
     if 
     t 
     and 
     ( 
     newest 
     is 
     None 
     or 
     t 
    > newest 
     ): 
     newest 
     = 
     t 
     for 
     h 
     in 
     history 
     : 
     t 
     = 
     h 
     . 
     get 
     ( 
     'history.created_time' 
     ) 
     if 
     t 
     and 
     ( 
     newest 
     is 
     None 
     or 
     t 
    > newest 
     ): 
     newest 
     = 
     t 
     return 
     newest 
     def 
      
     load_state 
     ( 
     bucket 
     ): 
     try 
     : 
     blob 
     = 
     bucket 
     . 
     blob 
     ( 
     STATE_KEY 
     ) 
     if 
     blob 
     . 
     exists 
     (): 
     return 
     json 
     . 
     loads 
     ( 
     blob 
     . 
      download_as_text 
     
     ()) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Warning: Could not load state: 
     { 
     e 
     } 
     " 
     ) 
     return 
     {} 
     def 
      
     save_state 
     ( 
     bucket 
     , 
     last_event_time_iso 
     ): 
     try 
     : 
     state 
     = 
     { 
     'last_event_time' 
     : 
     last_event_time_iso 
     , 
     'last_run' 
     : 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     . 
     isoformat 
     () 
     } 
     blob 
     = 
     bucket 
     . 
     blob 
     ( 
     STATE_KEY 
     ) 
     blob 
     . 
      upload_from_string 
     
     ( 
     json 
     . 
     dumps 
     ( 
     state 
     , 
     indent 
     = 
     2 
     ), 
     content_type 
     = 
     'application/json' 
     ) 
     print 
     ( 
     f 
     "Saved state: last_event_time= 
     { 
     last_event_time_iso 
     } 
     " 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     print 
     ( 
     f 
     "Warning: Could not save state: 
     { 
     e 
     } 
     " 
     ) 
     
    
  • requirements.txt:

     functions-framework==3.*
    google-cloud-storage==2.*
    urllib3>=2.0.0 
    
  1. Click Deployto save and deploy the function.
  2. Wait for deployment to complete (2-3 minutes).

Create Cloud Scheduler job

  1. In the GCP Console, go to Cloud Scheduler.
  2. Click Create Job.
  3. Provide the following configuration details:

    Setting Value
    Name looker-audit-collector-hourly
    Region Select same region as Cloud Run function
    Frequency 0 * * * * (every hour, on the hour)
    Timezone Select timezone (UTC recommended)
    Target type Pub/Sub
    Topic Select looker-audit-trigger
    Message body {} (empty JSON object)
  4. Click Create.

Test the integration

  1. In the Cloud Schedulerconsole, find your job ( looker-audit-collector-hourly ).
  2. Click Force runto trigger the job manually.
  3. Wait a few seconds.
  4. Go to Cloud Run > Services.
  5. Click on looker-audit-collector .
  6. Click the Logstab.
  7. Verify the function executed successfully. Look for:

     Fetching events from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Successfully obtained Looker API access token
    event page 1: X records (total: X)
    history page 1: X records (total: X)
    Wrote X records to gs://looker-audit-logs-gcs/looker-audit/looker_audit_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records (events: X, history: X) 
    
  8. Go to Cloud Storage > Buckets.

  9. Click on looker-audit-logs-gcs .

  10. Navigate to the looker-audit/ folder.

  11. Verify that a new .ndjson file was created with the current timestamp.

If you see errors in the logs:

  • HTTP 401: Verify the LOOKER_CLIENT_ID and LOOKER_CLIENT_SECRET environment variables are correct
  • HTTP 403: Verify the Looker user has the see_system_activity permission
  • HTTP 429: Rate limiting — the function will stop pagination and resume on the next scheduled run
  • Missing environment variables: Verify all required variables are set in the Cloud Run function configuration
  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. Click Configure a single feed.
  4. In the Feed namefield, enter a name for the feed (for example, Looker Audit Logs GCS ).
  5. Select Google Cloud Storage V2as the Source type.
  6. Select Looker Auditas the Log type.
  7. Click Get Service Account. A unique service account email will be displayed, for example:

     chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com 
    
  8. Copy this email address for use in the next step.

  9. Click Next.

  10. Specify values for the following input parameters:

    • Storage bucket URL: Enter the GCS bucket URI with the prefix path:

       gs://looker-audit-logs-gcs/looker-audit/ 
      
    • Source deletion option: Select the deletion option according to your preference:

      • Never: Never deletes any files after transfers (recommended for testing).
      • Delete transferred files: Deletes files after successful transfer.
      • Delete transferred files and empty directories: Deletes files and empty directories after successful transfer.

    • Maximum File Age: Include files modified in the last number of days (default is 180 days)

    • Asset namespace: The asset namespace

    • Ingestion labels: The label to be applied to the events from this feed

  11. Click Next.

  12. Review your new feed configuration in the Finalizescreen, and then click Submit.

  1. Go to Cloud Storage > Buckets.
  2. Click on looker-audit-logs-gcs .
  3. Go to the Permissionstab.
  4. Click Grant access.
  5. Provide the following configuration details:
    • Add principals: Paste the Google SecOps service account email
    • Assign roles: Select Storage Object Viewer
  6. Click Save.

Option B: Configure ingestion using Amazon S3

This option uses an AWS Lambda function to poll the Looker API for System Activity audit events and write them to an S3 bucket for ingestion by Google SecOps.

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket
  2. Save bucket Nameand Regionfor future reference (for example, looker-audit-logs ).
  3. Create a Userfollowing this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select Security credentialstab.
  6. Click Create Access Keyin section Access Keys.
  7. Select Third-party serviceas Use case.
  8. Click Next.
  9. Optional: Add description tag.
  10. Click Create access key.
  11. Click Download .csv fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select Permissionstab.
  14. Click Add permissionsin section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies > Create policy > JSON tab.
  2. Copy and paste the policy below.
  3. Policy JSON(replace looker-audit-logs if you entered a different bucket name):

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::looker-audit-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::looker-audit-logs/looker-audit/state.json" 
      
     } 
      
     ] 
     } 
     
    
  4. Click Next > Create policy.

  5. Go to IAM > Roles > Create role > AWS service > Lambda.

  6. Attach the newly created policy.

  7. Name the role LookerAuditCollectorRole and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    Setting Value
    Name LookerAuditCollector
    Runtime Python 3.13
    Architecture x86_64
    Execution role LookerAuditCollectorRole
  4. After the function is created, open the Codetab, delete the stub and paste the code below ( LookerAuditCollector.py).

      import 
      
     urllib3 
     import 
      
     json 
     import 
      
     boto3 
     import 
      
     os 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timezone 
     , 
     timedelta 
     import 
      
     logging 
     import 
      
     urllib.parse 
     logger 
     = 
     logging 
     . 
     getLogger 
     () 
     logger 
     . 
     setLevel 
     ( 
     logging 
     . 
     INFO 
     ) 
     http 
     = 
     urllib3 
     . 
     PoolManager 
     ( 
     timeout 
     = 
     urllib3 
     . 
     Timeout 
     ( 
     connect 
     = 
     10.0 
     , 
     read 
     = 
     60.0 
     ), 
     retries 
     = 
     False 
     , 
     ) 
     s3 
     = 
     boto3 
     . 
     client 
     ( 
     's3' 
     ) 
     BUCKET 
     = 
     os 
     . 
     environ 
     [ 
     'S3_BUCKET' 
     ] 
     PREFIX 
     = 
     os 
     . 
     environ 
     [ 
     'S3_PREFIX' 
     ] 
     STATE_KEY 
     = 
     os 
     . 
     environ 
     [ 
     'STATE_KEY' 
     ] 
     LOOKER_BASE_URL 
     = 
     os 
     . 
     environ 
     [ 
     'LOOKER_BASE_URL' 
     ] 
     . 
     rstrip 
     ( 
     '/' 
     ) 
     CLIENT_ID 
     = 
     os 
     . 
     environ 
     [ 
     'LOOKER_CLIENT_ID' 
     ] 
     CLIENT_SECRET 
     = 
     os 
     . 
     environ 
     [ 
     'LOOKER_CLIENT_SECRET' 
     ] 
     LOOKBACK_HOURS 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'LOOKBACK_HOURS' 
     , 
     '24' 
     )) 
     PAGE_SIZE 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'PAGE_SIZE' 
     , 
     '5000' 
     )) 
     MAX_PAGES 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'MAX_PAGES' 
     , 
     '20' 
     )) 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
     try 
     : 
     state 
     = 
     load_state 
     () 
     now 
     = 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     if 
     state 
     and 
     state 
     . 
     get 
     ( 
     'last_event_time' 
     ): 
     try 
     : 
     last_time 
     = 
     datetime 
     . 
     fromisoformat 
     ( 
     state 
     [ 
     'last_event_time' 
     ] 
     . 
     replace 
     ( 
     'Z' 
     , 
     '+00:00' 
     ) 
     ) 
     last_time 
     = 
     last_time 
     - 
     timedelta 
     ( 
     minutes 
     = 
     2 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     warning 
     ( 
     f 
     "Could not parse last_event_time: 
     { 
     e 
     } 
     " 
     ) 
     last_time 
     = 
     now 
     - 
     timedelta 
     ( 
     hours 
     = 
     LOOKBACK_HOURS 
     ) 
     else 
     : 
     last_time 
     = 
     now 
     - 
     timedelta 
     ( 
     hours 
     = 
     LOOKBACK_HOURS 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Fetching events from 
     { 
     last_time 
     . 
     isoformat 
     () 
     } 
     to 
     { 
     now 
     . 
     isoformat 
     () 
     } 
     " 
     ) 
     token 
     = 
     get_access_token 
     () 
     events 
     = 
     fetch_events 
     ( 
     token 
     , 
     last_time 
     , 
     now 
     ) 
     history 
     = 
     fetch_history 
     ( 
     token 
     , 
     last_time 
     , 
     now 
     ) 
     all_records 
     = 
     [] 
     for 
     e 
     in 
     events 
     : 
     e 
     [ 
     '_looker_record_type' 
     ] 
     = 
     'event' 
     all_records 
     . 
     append 
     ( 
     e 
     ) 
     for 
     h 
     in 
     history 
     : 
     h 
     [ 
     '_looker_record_type' 
     ] 
     = 
     'history' 
     all_records 
     . 
     append 
     ( 
     h 
     ) 
     if 
     not 
     all_records 
     : 
     logger 
     . 
     info 
     ( 
     "No new records found." 
     ) 
     save_state 
     ( 
     now 
     . 
     isoformat 
     ()) 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ({ 
     'events' 
     : 
     0 
     })} 
     timestamp 
     = 
     now 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     object_key 
     = 
     f 
     " 
     { 
     PREFIX 
     } 
     /looker_audit_ 
     { 
     timestamp 
     } 
     .ndjson" 
     ndjson 
     = 
     ' 
     \n 
     ' 
     . 
     join 
     ( 
     [ 
     json 
     . 
     dumps 
     ( 
     r 
     , 
     ensure_ascii 
     = 
     False 
     , 
     default 
     = 
     str 
     ) 
     for 
     r 
     in 
     all_records 
     ] 
     ) 
     + 
     ' 
     \n 
     ' 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     object_key 
     , 
     Body 
     = 
     ndjson 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     ContentType 
     = 
     'application/x-ndjson' 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Wrote 
     { 
     len 
     ( 
     all_records 
     ) 
     } 
     records to s3:// 
     { 
     BUCKET 
     } 
     / 
     { 
     object_key 
     } 
     " 
     ) 
     newest_time 
     = 
     find_newest_time 
     ( 
     events 
     , 
     history 
     ) 
     save_state 
     ( 
     newest_time 
     if 
     newest_time 
     else 
     now 
     . 
     isoformat 
     ()) 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ({ 
     'events' 
     : 
     len 
     ( 
     events 
     ), 
     'history' 
     : 
     len 
     ( 
     history 
     ), 
     'total' 
     : 
     len 
     ( 
     all_records 
     ) 
     }) 
     } 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     error 
     ( 
     f 
     "Lambda execution failed: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     " 
     ) 
     raise 
     def 
      
     get_access_token 
     (): 
     url 
     = 
     f 
     " 
     { 
     LOOKER_BASE_URL 
     } 
     /api/4.0/login" 
     encoded_body 
     = 
     urllib 
     . 
     parse 
     . 
     urlencode 
     ({ 
     'client_id' 
     : 
     CLIENT_ID 
     , 
     'client_secret' 
     : 
     CLIENT_SECRET 
     }) 
     . 
     encode 
     ( 
     'utf-8' 
     ) 
     response 
     = 
     http 
     . 
     request 
     ( 
     'POST' 
     , 
     url 
     , 
     body 
     = 
     encoded_body 
     , 
     headers 
     = 
     { 
     'Content-Type' 
     : 
     'application/x-www-form-urlencoded' 
     } 
     ) 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     raise 
     Exception 
     ( 
     f 
     "Login failed with status 
     { 
     response 
     . 
     status 
     } 
     : " 
     f 
     " 
     { 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     } 
     " 
     ) 
     data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     token 
     = 
     data 
     . 
     get 
     ( 
     'access_token' 
     ) 
     if 
     not 
     token 
     : 
     raise 
     Exception 
     ( 
     "No access_token in login response" 
     ) 
     logger 
     . 
     info 
     ( 
     "Successfully obtained Looker API access token" 
     ) 
     return 
     token 
     def 
      
     fetch_events 
     ( 
     token 
     , 
     start_time 
     , 
     end_time 
     ): 
     start_str 
     = 
     start_time 
     . 
     strftime 
     ( 
     '%Y-%m- 
     %d 
     %H:%M:%S' 
     ) 
     end_str 
     = 
     end_time 
     . 
     strftime 
     ( 
     '%Y-%m- 
     %d 
     %H:%M:%S' 
     ) 
     all_events 
     = 
     [] 
     offset 
     = 
     0 
     for 
     page 
     in 
     range 
     ( 
     MAX_PAGES 
     ): 
     query_body 
     = 
     { 
     "model" 
     : 
     "system__activity" 
     , 
     "view" 
     : 
     "event" 
     , 
     "fields" 
     : 
     [ 
     "event.id" 
     , 
     "event.name" 
     , 
     "event.category" 
     , 
     "event.created_time" 
     , 
     "event.is_api_call" 
     , 
     "event.is_admin" 
     , 
     "event.is_looker_employee" 
     , 
     "user.id" 
     , 
     "user.name" 
     , 
     "user.email" 
     ], 
     "filters" 
     : 
     { 
     "event.created_time" 
     : 
     f 
     " 
     { 
     start_str 
     } 
     to 
     { 
     end_str 
     } 
     " 
     }, 
     "sorts" 
     : 
     [ 
     "event.created_time asc" 
     ], 
     "limit" 
     : 
     str 
     ( 
     PAGE_SIZE 
     ), 
     "offset" 
     : 
     str 
     ( 
     offset 
     ) 
     } 
     url 
     = 
     f 
     " 
     { 
     LOOKER_BASE_URL 
     } 
     /api/4.0/queries/run/json" 
     response 
     = 
     http 
     . 
     request 
     ( 
     'POST' 
     , 
     url 
     , 
     body 
     = 
     json 
     . 
     dumps 
     ( 
     query_body 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     headers 
     = 
     { 
     'Authorization' 
     : 
     f 
     'token 
     { 
     token 
     } 
     ' 
     , 
     'Content-Type' 
     : 
     'application/json' 
     } 
     ) 
     if 
     response 
     . 
     status 
     == 
     429 
     : 
     logger 
     . 
     warning 
     ( 
     "Rate limited on events query. Stopping pagination." 
     ) 
     break 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     logger 
     . 
     error 
     ( 
     f 
     "Events query failed: 
     { 
     response 
     . 
     status 
     } 
     - " 
     f 
     " 
     { 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     } 
     " 
     ) 
     break 
     page_results 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     if 
     not 
     page_results 
     : 
     logger 
     . 
     info 
     ( 
     f 
     "Events: No more results at offset 
     { 
     offset 
     } 
     " 
     ) 
     break 
     all_events 
     . 
     extend 
     ( 
     page_results 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Events page 
     { 
     page 
      
     + 
      
     1 
     } 
     : Retrieved 
     { 
     len 
     ( 
     page_results 
     ) 
     } 
     records " 
     f 
     "(total: 
     { 
     len 
     ( 
     all_events 
     ) 
     } 
     )" 
     ) 
     if 
     len 
     ( 
     page_results 
     ) 
    < PAGE_SIZE 
     : 
     break 
     offset 
     += 
     PAGE_SIZE 
     logger 
     . 
     info 
     ( 
     f 
     "Total events fetched: 
     { 
     len 
     ( 
     all_events 
     ) 
     } 
     " 
     ) 
     return 
     all_events 
     def 
      
     fetch_history 
     ( 
     token 
     , 
     start_time 
     , 
     end_time 
     ): 
     start_str 
     = 
     start_time 
     . 
     strftime 
     ( 
     '%Y-%m- 
     %d 
     %H:%M:%S' 
     ) 
     end_str 
     = 
     end_time 
     . 
     strftime 
     ( 
     '%Y-%m- 
     %d 
     %H:%M:%S' 
     ) 
     all_history 
     = 
     [] 
     offset 
     = 
     0 
     for 
     page 
     in 
     range 
     ( 
     MAX_PAGES 
     ): 
     query_body 
     = 
     { 
     "model" 
     : 
     "system__activity" 
     , 
     "view" 
     : 
     "history" 
     , 
     "fields" 
     : 
     [ 
     "history.id" 
     , 
     "history.created_time" 
     , 
     "history.completed_time" 
     , 
     "history.status" 
     , 
     "history.source" 
     , 
     "history.issuer_source" 
     , 
     "history.runtime" 
     , 
     "history.message" 
     , 
     "query.id" 
     , 
     "query.model" 
     , 
     "query.view" 
     , 
     "user.id" 
     , 
     "user.name" 
     , 
     "user.email" 
     , 
     "dashboard.id" 
     , 
     "dashboard.title" 
     , 
     "look.id" 
     , 
     "look.title" 
     ], 
     "filters" 
     : 
     { 
     "history.created_time" 
     : 
     f 
     " 
     { 
     start_str 
     } 
     to 
     { 
     end_str 
     } 
     " 
     }, 
     "sorts" 
     : 
     [ 
     "history.created_time asc" 
     ], 
     "limit" 
     : 
     str 
     ( 
     PAGE_SIZE 
     ), 
     "offset" 
     : 
     str 
     ( 
     offset 
     ) 
     } 
     url 
     = 
     f 
     " 
     { 
     LOOKER_BASE_URL 
     } 
     /api/4.0/queries/run/json" 
     response 
     = 
     http 
     . 
     request 
     ( 
     'POST' 
     , 
     url 
     , 
     body 
     = 
     json 
     . 
     dumps 
     ( 
     query_body 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     headers 
     = 
     { 
     'Authorization' 
     : 
     f 
     'token 
     { 
     token 
     } 
     ' 
     , 
     'Content-Type' 
     : 
     'application/json' 
     } 
     ) 
     if 
     response 
     . 
     status 
     == 
     429 
     : 
     logger 
     . 
     warning 
     ( 
     "Rate limited on history query. Stopping pagination." 
     ) 
     break 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     logger 
     . 
     error 
     ( 
     f 
     "History query failed: 
     { 
     response 
     . 
     status 
     } 
     - " 
     f 
     " 
     { 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     ) 
     } 
     " 
     ) 
     break 
     page_results 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     if 
     not 
     page_results 
     : 
     logger 
     . 
     info 
     ( 
     f 
     "History: No more results at offset 
     { 
     offset 
     } 
     " 
     ) 
     break 
     all_history 
     . 
     extend 
     ( 
     page_results 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "History page 
     { 
     page 
      
     + 
      
     1 
     } 
     : Retrieved 
     { 
     len 
     ( 
     page_results 
     ) 
     } 
     records " 
     f 
     "(total: 
     { 
     len 
     ( 
     all_history 
     ) 
     } 
     )" 
     ) 
     if 
     len 
     ( 
     page_results 
     ) 
    < PAGE_SIZE 
     : 
     break 
     offset 
     += 
     PAGE_SIZE 
     logger 
     . 
     info 
     ( 
     f 
     "Total history records fetched: 
     { 
     len 
     ( 
     all_history 
     ) 
     } 
     " 
     ) 
     return 
     all_history 
     def 
      
     find_newest_time 
     ( 
     events 
     , 
     history 
     ): 
     newest 
     = 
     None 
     for 
     e 
     in 
     events 
     : 
     t 
     = 
     e 
     . 
     get 
     ( 
     'event.created_time' 
     ) 
     if 
     t 
     and 
     ( 
     newest 
     is 
     None 
     or 
     t 
    > newest 
     ): 
     newest 
     = 
     t 
     for 
     h 
     in 
     history 
     : 
     t 
     = 
     h 
     . 
     get 
     ( 
     'history.created_time' 
     ) 
     if 
     t 
     and 
     ( 
     newest 
     is 
     None 
     or 
     t 
    > newest 
     ): 
     newest 
     = 
     t 
     return 
     newest 
     def 
      
     load_state 
     (): 
     try 
     : 
     obj 
     = 
     s3 
     . 
     get_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     ) 
     return 
     json 
     . 
     loads 
     ( 
     obj 
     [ 
     'Body' 
     ] 
     . 
     read 
     () 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     except 
     s3 
     . 
     exceptions 
     . 
     NoSuchKey 
     : 
     logger 
     . 
     info 
     ( 
     "No previous state found, starting fresh" 
     ) 
     return 
     None 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     warning 
     ( 
     f 
     "Could not load state: 
     { 
     e 
     } 
     " 
     ) 
     return 
     None 
     def 
      
     save_state 
     ( 
     last_event_time 
     ): 
     state 
     = 
     { 
     'last_event_time' 
     : 
     last_event_time 
     , 
     'last_run' 
     : 
     datetime 
     . 
     now 
     ( 
     timezone 
     . 
     utc 
     ) 
     . 
     isoformat 
     () 
     } 
     s3 
     . 
     put_object 
     ( 
     Bucket 
     = 
     BUCKET 
     , 
     Key 
     = 
     STATE_KEY 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state 
     , 
     indent 
     = 
     2 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     ContentType 
     = 
     'application/json' 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Saved state: last_event_time= 
     { 
     last_event_time 
     } 
     " 
     ) 
     
    
  5. Go to Configuration > Environment variables > Edit > Add new environment variable.

  6. Enter the environment variables provided below, replacing with your values.

Environment variables

Key Example value
S3_BUCKET looker-audit-logs
S3_PREFIX looker-audit/
STATE_KEY looker-audit/state.json
LOOKER_BASE_URL https://your-instance.cloud.looker.com
LOOKER_CLIENT_ID your-looker-client-id
LOOKER_CLIENT_SECRET your-looker-client-secret
LOOKBACK_HOURS 24
PAGE_SIZE 5000
MAX_PAGES 20
  1. After the function is created, stay on its page (or open Lambda > Functions > your-function).
  2. Select the Configurationtab.
  3. In the General configurationpanel click Edit.
  4. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 1 hour )
    • Target: Your Lambda function LookerAuditCollector
    • Name: LookerAuditCollector-1h
  3. Click Create schedule.

Configure a feed in Google SecOps to ingest Looker audit logs

  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. On the next page, click Configure a single feed.
  4. Enter a unique name for the Feed name.
  5. Select Amazon S3 V2as the Source type.
  6. Select Looker Auditas the Log type.
  7. Click Nextand then click Submit.
  8. Specify values for the following fields:

    • S3 URI: s3://looker-audit-logs/looker-audit/
    • Source deletion option: Select the deletion option according to your preference
    • Maximum File Age: Include files modified in the last number of days (default is 180 days)
    • Access Key ID: User access key with access to the S3 bucket
    • Secret Access Key: User secret key with access to the S3 bucket
    • Asset namespace: The asset namespace
    • Ingestion labels: The label to be applied to the events from this feed
  9. Click Nextand then click Submit.

Looker System Activity data reference

The following table describes the key data available from Looker System Activity Explores that are collected by this integration:

Explore Data Collected Retention
Event
User authentication events, content creation and modification, permission changes, API calls, scheduled delivery events, download events 90 days (default)
History
Query execution history, dashboard and Look access, query runtime and status, source of query (UI, API, schedule) 90 days (default)

UDM mapping table

Log Field UDM Mapping Logic
Group Edit Link
additional.fields.Group_Edit_Link_label.value.string_value Value copied directly
Group ID
additional.fields.Group_ID_label.value.string_value Value copied directly
History Most Recent Run Length in Seconds
additional.fields.History_Most_Recent_Run_Length_in_Seconds_label.value.string_value Value copied directly
History Slug
additional.fields.History_Slug_label.value.string_value Value copied directly
History Source
additional.fields.History_Source_label.value.string_value Value copied directly
History Status
additional.fields.History_Status_label.value.string_value Value copied directly
Look Link
additional.fields.Look_Link_label.value.string_value Value copied directly
Look Title
additional.fields.Look_Title_label.value.string_value Value copied directly
User Edit Link
additional.fields.User_Edit_Link_label.value.string_value Value copied directly
User Home Folder
additional.fields.User_Home_Folder_label.value.string_value Value copied directly
dashboard.link
additional.fields.dashboard_link_label.value.string_value Value copied directly
dashboard.title
additional.fields.dashboard_title_label.value.string_value Value copied directly
history.source
additional.fields.history_source_label.value.string_value Value copied directly
history.status
additional.fields.history_status_label.value.string_value Value copied directly
history.id
additional.fields.id_label.value.string_value Converted to string
history.connection_name
additional.fields.name_label.value.string_value Value copied directly
query.model
additional.fields.query_model_label.value.string_value Value copied directly
query.view
additional.fields.query_view_label.value.string_value Value copied directly
sql_text.text
additional.fields.sql_text_text_label.value.string_value Value copied directly
History Created Time
metadata.event_timestamp Parsed using ISO8601, RFC3339, or yyyy-MM-dd HH:mm:ss format
has_principal_user
metadata.event_type Set to "NETWORK_CONNECTION" if has_principal and has_target are true; "USER_UNCATEGORIZED" if has_principal_user is true; "STATUS_UPDATE" if has_principal is true; else "GENERIC_EVENT"
has_principal
metadata.event_type
has_target
metadata.event_type
User Email
principal.email Value from User Email if not empty, else user.email
user.email
principal.email
Group Name
principal.group.group_display_name Value copied directly
User ID
principal.user.product_object_id Value from User ID if not empty, else user.id
user.id
principal.user.product_object_id
User Name
principal.user.userid Value from User Name if not empty, else user.name
user.name
principal.user.userid
Kevin Liu
security_result.category_details Merged with "Kevin_Liu_label" if Kevin Liu not empty, "History_ID_label" if History ID not empty, "History_Created_Date_label" if History Created Date not empty
History ID
security_result.category_details
History Created Date
security_result.category_details
Look Description
security_result.description Value copied directly
User Name sorted
target.hostname Value copied if User Dev Branch Name is not empty

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: