Collect Menlo Security Isolation Platform (MSIP) logs
This guide explains how you can ingest Menlo Security Isolation Platform (MSIP) logs to Google SecOps using Google Cloud Storage.
Menlo Security's isolation-centric approach splits web browsing and document retrieval between the user's device and an isolated, Disposable Virtual Container (DVC) away from the endpoint. All risky code is executed in the isolated DVC and never reaches the endpoint. Only safe display data is sent to the user's browser. The platform provides comprehensive logging for web access, audit events, email security, DLP violations, and SMTP activity.
Before you begin
Ensure that you have the following prerequisites:
- A Google SecOps instance
- Privileged access to Menlo Security Admin Portal
- Permissions to create API tokens in Menlo Security
- A GCP project with Cloud Storage, Cloud Run, Pub/Sub, and Cloud Scheduler APIs enabled
Configure Menlo Security API access
To enable the Cloud Run function to retrieve logs, you need to create an API token with the Log Export API permission.
Create API token
- Sign in to the Menlo Security Admin Portalat
https://admin.menlosecurity.com. - Go to Settings > Authentication > API Tokens.
- Click Create Token.
- In the Token Namefield, enter a descriptive name (for example,
Chronicle GCS Integration). - In the Permissionssection, select Log Export API.
- Click Createor Save.
- Copy and securely store the generated API token.
Important: The API token is displayed only once. Store it in a secure location immediately after creation.
Required API permissions
-
The API token requires the following permission:
Permission/Scope Access Level Purpose Log Export APIRead Retrieve logs from Menlo Security API
Create Google Cloud Storage bucket
- Go to the Google Cloud Console .
- Select your project or create a new one.
- In the navigation menu, go to Cloud Storage > Buckets.
- Click Create bucket.
-
Provide the following configuration details:
Setting Value Name your bucket Enter a globally unique name (for example, menlo-security-logs)Location type Choose based on your needs (Region, Dual-region, Multi-region) Location Select the location (for example, us-central1)Storage class Standard (recommended for frequently accessed logs) Access control Uniform (recommended) Protection tools Optional: Enable object versioning or retention policy -
Click Create.
Create service account for Cloud Run function
Create service account
- In the GCP Console, go to IAM & Admin > Service Accounts.
- Click Create Service Account.
- Provide the following configuration details:
- Service account name: Enter
menlo-logs-collector-sa - Service account description: Enter
Service account for Cloud Run function to collect Menlo Security logs
- Service account name: Enter
- Click Create and Continue.
- In the Grant this service account access to projectsection, add the following roles:
- Click Select a role.
- Search for and select Storage Object Admin.
- Click + Add another role.
- Search for and select Cloud Run Invoker.
- Click + Add another role.
- Search for and select Cloud Functions Invoker.
- Click Continue.
- Click Done.
Grant IAM permissions on GCS bucket
- Go to Cloud Storage > Buckets.
- Click on your bucket name (
menlo-security-logs). - Go to the Permissionstab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Enter the service account email (
menlo-logs-collector-sa@PROJECT_ID.iam.gserviceaccount.com) - Assign roles: Select Storage Object Admin
- Add principals: Enter the service account email (
- Click Save.
Create Pub/Sub topic
- In the GCP Console, go to Pub/Sub > Topics.
- Click Create topic.
- Provide the following configuration details:
- Topic ID: Enter
menlo-logs-trigger - Leave other settings as default
- Topic ID: Enter
- Click Create.
Create Cloud Run function to collect logs
- In the GCP Console, go to Cloud Run.
- Click Create service.
- Select Function(use an inline editor to create a function).
-
In the Configuresection, provide the following configuration details:
Setting Value Service name menlo-logs-collectorRegion Select region matching your GCS bucket (for example, us-central1)Runtime Select Python 3.12or later -
In the Trigger (optional)section:
- Click + Add trigger.
- Select Cloud Pub/Sub.
- In Select a Cloud Pub/Sub topic, choose
menlo-logs-trigger. - Click Save.
-
In the Authenticationsection:
- Select Require authentication.
- Check Identity and Access Management (IAM).
-
Scroll down and expand Containers, Networking, Security.
-
Go to the Securitytab:
- Service account: Select
menlo-logs-collector-sa
- Service account: Select
-
Go to the Containerstab:
- Click Variables & Secrets.
- Click + Add variablefor each environment variable:
Variable Name Example Value Description GCS_BUCKETmenlo-security-logsGCS bucket name GCS_PREFIXmenlo-logsPrefix for log files STATE_KEYmenlo-logs/state.jsonState file path MENLO_API_TOKENyour-menlo-api-tokenMenlo Security API token MENLO_API_HOSTlogs.menlosecurity.comMenlo Security API host MENLO_LOG_TYPESweb,audit,email,dlpComma-separated log types LOOKBACK_HOURS1Initial lookback period -
In the Variables & Secretssection, scroll down to Requests:
- Request timeout: Enter
600seconds (10 minutes)
- Request timeout: Enter
-
Go to the Settingstab:
- In the Resourcessection:
- Memory: Select 512 MiBor higher
- CPU: Select 1
- In the Resourcessection:
-
In the Revision scalingsection:
- Minimum number of instances: Enter
0 - Maximum number of instances: Enter
100
- Minimum number of instances: Enter
-
Click Create.
-
Wait for the service to be created (1-2 minutes).
-
After the service is created, the inline code editorwill open automatically.
Add function code
- Enter mainin the Entry pointfield.
-
In the inline code editor, create two files:
- main.py:
import functions_framework from google.cloud import storage import os import json import urllib3 import time import uuid from datetime import datetime , timezone , timedelta http = urllib3 . PoolManager ( timeout = urllib3 . Timeout ( connect = 5.0 , read = 60.0 ), retries = False , ) storage_client = storage . Client () GCS_BUCKET = os . environ . get ( 'GCS_BUCKET' ) GCS_PREFIX = os . environ . get ( 'GCS_PREFIX' , 'menlo-logs' ) STATE_KEY = os . environ . get ( 'STATE_KEY' , 'menlo-logs/state.json' ) API_TOKEN = os . environ . get ( 'MENLO_API_TOKEN' ) API_HOST = os . environ . get ( 'MENLO_API_HOST' , 'logs.menlosecurity.com' ) LOG_TYPES = os . environ . get ( 'MENLO_LOG_TYPES' , 'web' ) . split ( ',' ) LOOKBACK_HOURS = int ( os . environ . get ( 'LOOKBACK_HOURS' , '1' )) @functions_framework . cloud_event def main ( cloud_event ): if not all ([ GCS_BUCKET , API_TOKEN , API_HOST ]): print ( 'Error: Missing required environment variables' ) return try : bucket = storage_client . bucket ( GCS_BUCKET ) state = load_state ( bucket ) now = datetime . now ( timezone . utc ) . replace ( microsecond = 0 ) if isinstance ( state , dict ) and state . get ( 'last_event_time' ): try : last_val = state [ 'last_event_time' ] if last_val . endswith ( 'Z' ): last_val = last_val [: - 1 ] + '+00:00' last_time = datetime . fromisoformat ( last_val ) last_time = last_time - timedelta ( minutes = 2 ) except Exception as e : print ( f "Warning: Could not parse last_event_time: { e } " ) last_time = now - timedelta ( hours = LOOKBACK_HOURS ) else : last_time = now - timedelta ( hours = LOOKBACK_HOURS ) print ( f "Fetching logs from { last_time . isoformat () } to { now . isoformat () } " ) total = 0 newest_time = None for log_type in LOG_TYPES : log_type = log_type . strip () print ( f "Fetching { log_type } logs..." ) events = fetch_logs ( log_type , API_TOKEN , API_HOST , last_time , now ) if events : object_key = f " { GCS_PREFIX } / { now : %Y/%m/%d } /menlo- { log_type } - { uuid . uuid4 () } .ndjson" blob = bucket . blob ( object_key ) ndjson = ' \n ' . join ( [ json . dumps ( ev , ensure_ascii = False ) for ev in events ] ) + ' \n ' blob . upload_from_string ( ndjson , content_type = 'application/x-ndjson' ) print ( f "Uploaded { len ( events ) } { log_type } events to gs:// { GCS_BUCKET } / { object_key } " ) total += len ( events ) for ev in events : ev_time = ev . get ( 'timestamp' ) or ev . get ( 'time' ) or ev . get ( 'created_at' ) if ev_time and ( newest_time is None or str ( ev_time ) > str ( newest_time )): newest_time = str ( ev_time ) else : print ( f "No { log_type } events found" ) save_state ( bucket , newest_time if newest_time else now . isoformat ()) print ( f "Successfully processed { total } total events" ) except Exception as e : print ( f 'Error processing logs: { str ( e ) } ' ) raise def fetch_logs ( log_type , token , api_host , start , end ): url = f "https:// { api_host } /api/v1/logs/ { log_type } " headers = { 'Authorization' : f 'Token { token } ' , 'Accept' : 'application/json' , } params = { 'start' : start . strftime ( '%Y-%m- %d T%H:%M:%SZ' ), 'end' : end . strftime ( '%Y-%m- %d T%H:%M:%SZ' ), } query = '&' . join ( f ' { k } = { v } ' for k , v in params . items ()) full_url = f ' { url } ? { query } ' all_events = [] backoff = 1.0 while full_url : response = http . request ( 'GET' , full_url , headers = headers ) if response . status == 429 : retry_after = int ( response . headers . get ( 'Retry-After' , str ( int ( backoff )))) print ( f "Rate limited. Waiting { retry_after } s..." ) time . sleep ( retry_after ) backoff = min ( backoff * 2 , 60.0 ) continue if response . status != 200 : print ( f "API error { response . status } : { response . data . decode ( 'utf-8' ) } " ) break backoff = 1.0 data = json . loads ( response . data . decode ( 'utf-8' )) events = data if isinstance ( data , list ) else data . get ( 'events' , data . get ( 'data' , [])) all_events . extend ( events ) full_url = data . get ( 'next' ) if isinstance ( data , dict ) else None print ( f "Total { log_type } events fetched: { len ( all_events ) } " ) return all_events def load_state ( bucket ): try : blob = bucket . blob ( STATE_KEY ) if blob . exists (): return json . loads ( blob . download_as_text ()) except Exception as e : print ( f "Warning: Could not load state: { e } " ) return {} def save_state ( bucket , last_event_time_iso ): try : state = { 'last_event_time' : last_event_time_iso , 'last_run' : datetime . now ( timezone . utc ) . isoformat () } blob = bucket . blob ( STATE_KEY ) blob . upload_from_string ( json . dumps ( state , indent = 2 ), content_type = 'application/json' ) print ( f "Saved state: last_event_time= { last_event_time_iso } " ) except Exception as e : print ( f "Warning: Could not save state: { e } " )- requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0 -
Click Deployto save and deploy the function.
-
Wait for deployment to complete (2-3 minutes).
Create Cloud Scheduler job
- In the GCP Console, go to Cloud Scheduler.
- Click Create Job.
-
Provide the following configuration details:
Setting Value Name menlo-logs-collector-hourlyRegion Select same region as Cloud Run function Frequency 0 * * * *(every hour, on the hour)Timezone Select timezone (UTC recommended) Target type Pub/Sub Topic Select menlo-logs-triggerMessage body {}(empty JSON object) -
Click Create.
Test the integration
- In the Cloud Schedulerconsole, find your job (
menlo-logs-collector-hourly). - Click Force runto trigger the job manually.
- Wait a few seconds.
- Go to Cloud Run > Services.
- Click on
menlo-logs-collector. - Click the Logstab.
-
Verify the function executed successfully. Look for:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Fetching web logs... Total web events fetched: X Uploaded X web events to gs://menlo-security-logs/menlo-logs/YYYY/MM/DD/menlo-web-UUID.ndjson Successfully processed X total events -
Go to Cloud Storage > Buckets.
-
Click on
menlo-security-logs. -
Navigate to the
menlo-logs/folder. -
Verify that new
.ndjsonfiles were created with the current date.
If you see errors in the logs:
- HTTP 401: Verify the
MENLO_API_TOKENenvironment variable is correct - HTTP 403: Verify the API token has the Log Export APIpermission
- HTTP 429: Rate limiting — the function will automatically retry with backoff
- Missing environment variables: Verify all required variables are set in the Cloud Run function configuration
Retrieve the Google SecOps service account
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- Click Configure a single feed.
- In the Feed namefield, enter a name for the feed (for example,
Menlo Security MSIP Logs). - Select Google Cloud Storage V2as the Source type.
- Select Menlo Securityas the Log type.
-
Click Get Service Account. A unique service account email will be displayed, for example:
chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com -
Copy this email address for use in the next step.
-
Click Next.
-
Specify values for the following input parameters:
-
Storage bucket URL: Enter the GCS bucket URI with the prefix path:
gs://menlo-security-logs/menlo-logs/
-
Source deletion option: Select the deletion option according to your preference:
- Never: Never deletes any files after transfers (recommended for testing).
- Delete transferred files: Deletes files after successful transfer.
-
Delete transferred files and empty directories: Deletes files and empty directories after successful transfer.
-
Maximum File Age: Include files modified in the last number of days (default is 180 days)
-
Asset namespace: The asset namespace
-
Ingestion labels: The label to be applied to the events from this feed
-
-
Click Next.
-
Review your new feed configuration in the Finalizescreen, and then click Submit.
Grant IAM permissions to the Google SecOps service account
- Go to Cloud Storage > Buckets.
- Click on
menlo-security-logs. - Go to the Permissionstab.
- Click Grant access.
- Provide the following configuration details:
- Add principals: Paste the Google SecOps service account email
- Assign roles: Select Storage Object Viewer
- Click Save.
UDM mapping table
| Log Field | UDM Mapping | Logic |
|---|---|---|
|
customer_name
|
intermediary.hostname | Set if customer_name not IP |
|
customer_name
|
intermediary.ip | Extracted from customer_name if IP, or from xff_ip IPs |
|
xff_ip
|
intermediary.ip | |
|
event_data.event_time
|
metadata.event_timestamp | Parsed using grok and date filter |
|
request_type
|
metadata.event_type | Conditional logic based on these fields |
|
has_principal
|
metadata.event_type | |
|
has_top_url
|
metadata.event_type | |
|
has_dest
|
metadata.event_type | |
|
log_type
|
metadata.log_type | Value copied directly |
|
event_data.name
|
metadata.product_event_type | Value copied directly |
|
reqId
|
metadata.product_log_id | Value copied directly |
|
event_data.version
|
metadata.product_version | Value copied directly |
|
event_data.protocol
|
network.application_protocol | Mapped using predefined protocol list |
|
request_type
|
network.http.method | Value copied directly |
|
referer
|
network.http.referral_url | Value copied directly |
|
response_code
|
network.http.response_code | Converted to integer |
|
event_data.user-agent
|
network.http.user_agent | Value copied directly |
|
full_session_id
|
network.session_id | Value copied directly |
|
browser_and_version
|
principal.application | Value copied directly |
|
event_data.x-client-ip
|
principal.asset.ip | Value from event_data.x-client-ip if not empty, else src_ip |
|
src_ip
|
principal.asset.ip | |
|
groups
|
principal.group.product_object_id | Value copied directly |
|
event_data.x-client-ip
|
principal.ip | Value from event_data.x-client-ip if not empty, else src_ip |
|
src_ip
|
principal.ip | |
|
region
|
principal.location.country_or_region | Value from region if not empty, else egress_country |
|
egress_country
|
principal.location.country_or_region | |
|
region_name
|
principal.location.name | Value copied directly |
|
egress_ip
|
principal.nat_ip | Value copied directly |
|
event_data.userid
|
principal.user.email_addresses | Merged if matches email regex |
|
event_data.userid
|
principal.user.userid | Value copied directly |
|
event_data.domain
|
security_result.about.administrative_domain | Value copied directly |
|
event_data.url
|
security_result.about.url | Value copied directly |
|
event_data.pe_action
|
security_result.action | Mapped to ALLOW, BLOCK, or ALLOW_WITH_MODIFICATION |
|
event_data.categories
|
security_result.category_details | Merged from event_data.categories (extracted or direct) and casb_cat_name |
|
casb_cat_name
|
security_result.category_details | |
|
event_data.risk_score
|
security_result.confidence_details | Value copied directly |
|
content-type
|
security_result.detection_fields | Merged from various label fields |
|
product
|
security_result.detection_fields | |
|
threats
|
security_result.detection_fields | |
|
virusDetails
|
security_result.detection_fields | |
|
sandboxResult
|
security_result.detection_fields | |
|
fullScanResult
|
security_result.detection_fields | |
|
tab_id
|
security_result.detection_fields | |
|
pe_reason
|
security_result.detection_fields | |
|
ua_type
|
security_result.detection_fields | |
|
email_isolation_state
|
security_result.detection_fields | |
|
document_url
|
security_result.detection_fields | |
|
archive_path
|
security_result.detection_fields | |
|
casb_app_name
|
security_result.detection_fields | |
|
casb_fun_name
|
security_result.detection_fields | |
|
casb_org_name
|
security_result.detection_fields | |
|
casb_profile_id
|
security_result.detection_fields | |
|
casb_profile_name
|
security_result.detection_fields | |
|
casb_profile_type
|
security_result.detection_fields | |
|
casb_risk_score
|
security_result.detection_fields | |
|
connId
|
security_result.detection_fields | |
|
origin_country
|
security_result.detection_fields | |
|
egress_country
|
security_result.detection_fields | |
|
is_casb_ddl
|
security_result.detection_fields | |
|
is_iframe
|
security_result.detection_fields | |
|
parent_file_id
|
security_result.detection_fields | |
|
parent_filename
|
security_result.detection_fields | |
|
parent_sha256
|
security_result.detection_fields | |
|
parent_tid
|
security_result.detection_fields | |
|
root_file_id
|
security_result.detection_fields | |
|
root_filename
|
security_result.detection_fields | |
|
root_sha256
|
security_result.detection_fields | |
|
casb_risk_score
|
security_result.risk_score | Converted to float |
|
casb_profile_id
|
security_result.rule_id | Value copied directly |
|
pe_rulename
|
security_result.rule_name | Value copied directly |
|
casb_profile_name
|
security_result.rule_set | Value copied directly |
|
casb_profile_type
|
security_result.rule_type | Value copied directly |
|
event_data.severity
|
security_result.severity_details | Value copied directly |
|
magicName
|
security_result.summary | Concatenated as "%{magicName} is %{threats}." |
|
threats
|
security_result.summary | |
|
threat_types
|
security_result.threat_name | Value copied directly |
|
origin_ip
|
src.ip | Value copied directly |
|
origin_country
|
src.location.country_or_region | Value copied directly |
|
event_data.dst
|
target.asset.ip | Extracted from event_data.dst or dst using grok IP pattern |
|
dst
|
target.asset.ip | |
|
filename
|
target.file.full_path | Value copied directly |
|
mimeType
|
target.file.mime_type | Value copied directly |
|
sha256
|
target.file.sha256 | Value copied directly |
|
file_size
|
target.file.size | Converted to float then uinteger |
|
event_data.dst
|
target.ip | Extracted from event_data.dst or dst using grok IP pattern |
|
dst
|
target.ip | |
|
top_url
|
target.url | Extracted using URI grok pattern |
|
metadata.product_name
|
metadata.product_name | Set to "MENLO SECURITY" |
|
metadata.vendor_name
|
metadata.vendor_name | Set to "MENLO SECURITY" |
Need more help? Get answers from Community members and Google SecOps professionals.

