Collect Cisco vManage SD-WAN logs
This document explains how to ingest Cisco vManage SD-WAN logs to Google Security Operations using Amazon S3.
Before you begin
Make sure you have the following prerequisites:
- A Google SecOps instance
- Privileged access to the Cisco vManage SD-WANmanagement console
- Privileged access to AWS(S3, IAM, Lambda, EventBridge)
Collect Cisco vManage SD-WAN prerequisites (credentials and base URL)
- Sign in to the Cisco vManage Management Console.
- Go to Administration > Settings > Users.
- Create a new user or use an existing admin user with API access privileges.
- Copy and save in a secure location the following details:
- Username
- Password
- vManage Base URL(for example,
https://your-vmanage-server:8443)
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucketfollowing this user guide: Creating a bucket
- Save bucket Nameand Regionfor future reference (for example,
cisco-sdwan-logs-bucket). - Create a user following this user guide: Creating an IAM user .
- Select the created User.
- Select the Security credentialstab.
- Click Create Access Keyin the Access Keyssection.
- Select Third-party serviceas the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
- Click Done.
- Select the Permissionstab.
- Click Add permissionsin the Permissions policiessection.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccesspolicy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- In the AWS console, go to IAM > Policies > Create policy > JSON tab.
-
Enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Sid" : "AllowPutObjects" , "Effect" : "Allow" , "Action" : "s3:PutObject" , "Resource" : "arn:aws:s3:::cisco-sdwan-logs-bucket/*" }, { "Sid" : "AllowGetStateObject" , "Effect" : "Allow" , "Action" : "s3:GetObject" , "Resource" : "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/state.json" } ] }- Replace
cisco-sdwan-logs-bucketif you entered a different bucket name.
- Replace
-
Click Next > Create policy.
-
Go to IAM > Roles.
-
Click Create role > AWS service > Lambda.
-
Attach the newly created policy.
-
Name the role
cisco-sdwan-lambda-roleand click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
-
Provide the following configuration details:
Setting Value Name cisco-sdwan-log-collectorRuntime Python 3.13 Architecture x86_64 Execution role cisco-sdwan-lambda-role -
After the function is created, open the Codetab, delete the stub and enter the following code (
cisco-sdwan-log-collector.py):import json import boto3 import os import urllib3 import urllib.parse from datetime import datetime , timezone from botocore.exceptions import ClientError # Disable SSL warnings for self-signed certificates urllib3 . disable_warnings ( urllib3 . exceptions . InsecureRequestWarning ) # Environment variables VMANAGE_HOST = os . environ [ 'VMANAGE_HOST' ] VMANAGE_USERNAME = os . environ [ 'VMANAGE_USERNAME' ] VMANAGE_PASSWORD = os . environ [ 'VMANAGE_PASSWORD' ] S3_BUCKET = os . environ [ 'S3_BUCKET' ] S3_PREFIX = os . environ [ 'S3_PREFIX' ] STATE_KEY = os . environ [ 'STATE_KEY' ] s3_client = boto3 . client ( 's3' ) http = urllib3 . PoolManager ( cert_reqs = 'CERT_NONE' ) class VManageAPI : def __init__ ( self , host , username , password ): self . host = host . rstrip ( '/' ) self . username = username self . password = password self . cookies = None self . token = None def authenticate ( self ): """Authenticate with vManage and get session tokens""" try : # Login to get JSESSIONID login_url = f " { self . host } /j_security_check" login_data = urllib . parse . urlencode ({ 'j_username' : self . username , 'j_password' : self . password }) response = http . request ( 'POST' , login_url , body = login_data , headers = { 'Content-Type' : 'application/x-www-form-urlencoded' }, timeout = 30 ) # Check if login was successful (vManage returns HTML on failure) if b '<html>' in response . data or response . status != 200 : raise Exception ( "Authentication failed" ) # Extract cookies self . cookies = {} if 'Set-Cookie' in response . headers : cookie_header = response . headers [ 'Set-Cookie' ] for cookie in cookie_header . split ( ';' ): if 'JSESSIONID=' in cookie : self . cookies [ 'JSESSIONID' ] = cookie . split ( 'JSESSIONID=' )[ 1 ] . split ( ';' )[ 0 ] break if not self . cookies . get ( 'JSESSIONID' ): raise Exception ( "Failed to get JSESSIONID" ) # Get XSRF token token_url = f " { self . host } /dataservice/client/token" headers = { 'Content-Type' : 'application/json' , 'Cookie' : f "JSESSIONID= { self . cookies [ 'JSESSIONID' ] } " } response = http . request ( 'GET' , token_url , headers = headers , timeout = 30 ) if response . status == 200 : self . token = response . data . decode ( 'utf-8' ) return True else : raise Exception ( f "Failed to get XSRF token: { response . status } " ) except Exception as e : print ( f "Authentication error: { e } " ) return False def get_headers ( self ): """Get headers for API requests""" return { 'Content-Type' : 'application/json' , 'Cookie' : f "JSESSIONID= { self . cookies [ 'JSESSIONID' ] } " , 'X-XSRF-TOKEN' : self . token } def get_audit_logs ( self , last_timestamp = None ): """Get audit logs from vManage""" try : url = f " { self . host } /dataservice/auditlog" headers = self . get_headers () # Build query for recent logs query = { "query" : { "condition" : "AND" , "rules" : [] }, "size" : 10000 } # Add timestamp filter if provided if last_timestamp : # Convert timestamp to epoch milliseconds for vManage API if isinstance ( last_timestamp , str ): try : dt = datetime . fromisoformat ( last_timestamp . replace ( 'Z' , '+00:00' )) epoch_ms = int ( dt . timestamp () * 1000 ) except : epoch_ms = int ( last_timestamp ) else : epoch_ms = int ( last_timestamp ) query [ "query" ][ "rules" ] . append ({ "value" : [ str ( epoch_ms )], "field" : "entry_time" , "type" : "date" , "operator" : "greater" }) else : # Get last 1 hour of logs by default query [ "query" ][ "rules" ] . append ({ "value" : [ "1" ], "field" : "entry_time" , "type" : "date" , "operator" : "last_n_hours" }) response = http . request ( 'POST' , url , body = json . dumps ( query ), headers = headers , timeout = 60 ) if response . status == 200 : return json . loads ( response . data . decode ( 'utf-8' )) else : print ( f "Failed to get audit logs: { response . status } " ) return None except Exception as e : print ( f "Error getting audit logs: { e } " ) return None def get_alarms ( self , last_timestamp = None ): """Get alarms from vManage""" try : url = f " { self . host } /dataservice/alarms" headers = self . get_headers () # Build query for recent alarms query = { "query" : { "condition" : "AND" , "rules" : [] }, "size" : 10000 } # Add timestamp filter if provided if last_timestamp : # Convert timestamp to epoch milliseconds for vManage API if isinstance ( last_timestamp , str ): try : dt = datetime . fromisoformat ( last_timestamp . replace ( 'Z' , '+00:00' )) epoch_ms = int ( dt . timestamp () * 1000 ) except : epoch_ms = int ( last_timestamp ) else : epoch_ms = int ( last_timestamp ) query [ "query" ][ "rules" ] . append ({ "value" : [ str ( epoch_ms )], "field" : "entry_time" , "type" : "date" , "operator" : "greater" }) else : # Get last 1 hour of alarms by default query [ "query" ][ "rules" ] . append ({ "value" : [ "1" ], "field" : "entry_time" , "type" : "date" , "operator" : "last_n_hours" }) response = http . request ( 'POST' , url , body = json . dumps ( query ), headers = headers , timeout = 60 ) if response . status == 200 : return json . loads ( response . data . decode ( 'utf-8' )) else : print ( f "Failed to get alarms: { response . status } " ) return None except Exception as e : print ( f "Error getting alarms: { e } " ) return None def get_events ( self , last_timestamp = None ): """Get events from vManage""" try : url = f " { self . host } /dataservice/events" headers = self . get_headers () # Build query for recent events query = { "query" : { "condition" : "AND" , "rules" : [] }, "size" : 10000 } # Add timestamp filter if provided if last_timestamp : # Convert timestamp to epoch milliseconds for vManage API if isinstance ( last_timestamp , str ): try : dt = datetime . fromisoformat ( last_timestamp . replace ( 'Z' , '+00:00' )) epoch_ms = int ( dt . timestamp () * 1000 ) except : epoch_ms = int ( last_timestamp ) else : epoch_ms = int ( last_timestamp ) query [ "query" ][ "rules" ] . append ({ "value" : [ str ( epoch_ms )], "field" : "entry_time" , "type" : "date" , "operator" : "greater" }) else : # Get last 1 hour of events by default query [ "query" ][ "rules" ] . append ({ "value" : [ "1" ], "field" : "entry_time" , "type" : "date" , "operator" : "last_n_hours" }) response = http . request ( 'POST' , url , body = json . dumps ( query ), headers = headers , timeout = 60 ) if response . status == 200 : return json . loads ( response . data . decode ( 'utf-8' )) else : print ( f "Failed to get events: { response . status } " ) return None except Exception as e : print ( f "Error getting events: { e } " ) return None def get_last_run_time (): """Get the last successful run timestamp from S3""" try : response = s3_client . get_object ( Bucket = S3_BUCKET , Key = STATE_KEY ) state_data = json . loads ( response [ 'Body' ] . read ()) return state_data . get ( 'last_run_time' ) except ClientError as e : if e . response [ 'Error' ][ 'Code' ] == 'NoSuchKey' : print ( "No previous state found, collecting last hour of logs" ) return None else : print ( f "Error reading state: { e } " ) return None except Exception as e : print ( f "Error reading state: { e } " ) return None def update_last_run_time ( timestamp ): """Update the last successful run timestamp in S3""" try : state_data = { 'last_run_time' : timestamp , 'updated_at' : datetime . now ( timezone . utc ) . isoformat () } s3_client . put_object ( Bucket = S3_BUCKET , Key = STATE_KEY , Body = json . dumps ( state_data ), ContentType = 'application/json' ) print ( f "Updated state with timestamp: { timestamp } " ) except Exception as e : print ( f "Error updating state: { e } " ) def upload_logs_to_s3 ( logs_data , log_type , timestamp ): """Upload logs to S3 bucket""" try : if not logs_data or 'data' not in logs_data or not logs_data [ 'data' ]: print ( f "No { log_type } data to upload" ) return # Create filename with timestamp dt = datetime . now ( timezone . utc ) filename = f " { S3_PREFIX }{ log_type } / { dt . strftime ( '%Y/%m/ %d ' ) } / { log_type } _ { dt . strftime ( '%Y%m %d _%H%M%S' ) } .json" # Upload to S3 s3_client . put_object ( Bucket = S3_BUCKET , Key = filename , Body = json . dumps ( logs_data ), ContentType = 'application/json' ) print ( f "Uploaded { len ( logs_data [ 'data' ]) } { log_type } records to s3:// { S3_BUCKET } / { filename } " ) except Exception as e : print ( f "Error uploading { log_type } to S3: { e } " ) def lambda_handler ( event , context ): """Main Lambda handler function""" print ( f "Starting Cisco vManage log collection at { datetime . now ( timezone . utc ) } " ) try : # Get last run time last_run_time = get_last_run_time () # Initialize vManage API client vmanage = VManageAPI ( VMANAGE_HOST , VMANAGE_USERNAME , VMANAGE_PASSWORD ) # Authenticate if not vmanage . authenticate (): return { 'statusCode' : 500 , 'body' : json . dumps ( 'Failed to authenticate with vManage' ) } print ( "Successfully authenticated with vManage" ) # Current timestamp for state tracking (store as epoch milliseconds) current_time = int ( datetime . now ( timezone . utc ) . timestamp () * 1000 ) # Collect different types of logs log_types = [ ( 'audit_logs' , vmanage . get_audit_logs ), ( 'alarms' , vmanage . get_alarms ), ( 'events' , vmanage . get_events ) ] total_records = 0 for log_type , get_function in log_types : try : print ( f "Collecting { log_type } ..." ) logs_data = get_function ( last_run_time ) if logs_data : upload_logs_to_s3 ( logs_data , log_type , current_time ) if 'data' in logs_data : total_records += len ( logs_data [ 'data' ]) except Exception as e : print ( f "Error processing { log_type } : { e } " ) continue # Update state with current timestamp update_last_run_time ( current_time ) print ( f "Collection completed. Total records processed: { total_records } " ) return { 'statusCode' : 200 , 'body' : json . dumps ({ 'message' : 'Log collection completed successfully' , 'total_records' : total_records , 'timestamp' : datetime . now ( timezone . utc ) . isoformat () }) } except Exception as e : print ( f "Lambda execution error: { e } " ) return { 'statusCode' : 500 , 'body' : json . dumps ( f 'Error: { str ( e ) } ' ) } -
Go to Configuration > Environment variables.
-
Click Edit > Add new environment variable.
-
Enter the following environment variables, replacing with your values:
Key Example value S3_BUCKETcisco-sdwan-logs-bucketS3_PREFIXcisco-sdwan/STATE_KEYcisco-sdwan/state.jsonVMANAGE_HOSThttps://your-vmanage-server:8443VMANAGE_USERNAMEyour-vmanage-usernameVMANAGE_PASSWORDyour-vmanage-password -
After the function is created, stay on its page (or open Lambda > Functions > cisco-sdwan-log-collector).
-
Select the Configurationtab.
-
In the General configurationpanel click Edit.
-
Change Timeoutto 5 minutes (300 seconds)and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate(
1 hour) - Target: your Lambda function
cisco-sdwan-log-collector - Name:
cisco-sdwan-log-collector-1h
- Recurring schedule: Rate(
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- In the AWS Console. go to IAM > Users > Add users.
- Click Add users.
- Provide the following configuration details:
- User:
secops-reader - Access type: Access key — Programmatic access
- User:
- Click Create user.
- Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
-
In the JSON editor, enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Effect" : "Allow" , "Action" : [ "s3:ListBucket" ], "Resource" : "arn:aws:s3:::cisco-sdwan-logs-bucket" , "Condition" : { "StringLike" : { "s3:prefix" : [ "cisco-sdwan/*" ] } } }, { "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::cisco-sdwan-logs-bucket/cisco-sdwan/*" } ] } -
Name the policy
secops-reader-policy. -
Click Create policy.
-
Go back to the user creation, search for and select
secops-reader-policy. -
Click Next: Tags.
-
Click Next: Review.
-
Click Create user.
-
Download the CSV(these values are entered into the feed).
Configure a feed in Google SecOps to ingest Cisco vManage SD-WAN logs
- Go to SIEM Settings > Feeds.
- Click + Add New Feed.
- In the Feed namefield, enter a name for the feed (for example,
Cisco SD-WAN logs). - Select Amazon S3 V2as the Source type.
- Select Cisco vManage SD-WANas the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://cisco-sdwan-logs-bucket/cisco-sdwan/ - Source deletion options: Select the deletion option according to your preference.
- Maximum File Age: Include files modified in the last number of days. Default 180 Days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: the asset namespace .
- Ingestion labels: the label applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalizescreen, and then click Submit.
Need more help? Get answers from Community members and Google SecOps professionals.

