Collect Cisco Application Centric Infrastructure (ACI) logs

Supported in:

This document explains how to ingest Cisco Application Centric Infrastructure (ACI) logs to Google Security Operations. The parser first attempts to process incoming Cisco ACI logs as syslog messages using Grok patterns. If the syslog parsing fails, it assumes the message is in JSON format and parses it accordingly. Finally, it maps the extracted fields to the unified data model (UDM).

This integration supports two methods:

  • Option 1: Syslog format via Bindplane agent
  • Option 2: JSON format via AWS S3 using APIC REST API

Each option is self-contained and can be implemented independently based on your infrastructure requirements and log format preferences.

Option 1: Syslog via Bindplane agent

This option configures Cisco ACI fabric to send syslog messages to a Bindplane agent, which forwards them to Chronicle for analysis.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance
  • A Windows 2016 or later or Linux host with systemd
  • If running behind a proxy, ensure firewall ports are open per the Bindplane agent requirements
  • Privileged access to the Cisco APIC console

Get Google SecOps ingestion authentication file

  1. Sign in to the Google SecOps console.
  2. Go to SIEM Settings > Collection Agents.
  3. Download the Ingestion Authentication File. Save the file securely on the system where Bindplane will be installed.

Get Google SecOps customer ID

  1. Sign in to the Google SecOps console.
  2. Go to SIEM Settings > Profile.
  3. Copy and save the Customer IDfrom the Organization Detailssection.

Install the Bindplane agent

Install the Bindplane agent on your Windows or Linux operating system according to the following instructions.

Windows installation

  1. Open the Command Promptor PowerShellas an administrator.
  2. Run the following command:

      msiexec 
      
     / 
     i 
      
     "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" 
      
     / 
     quiet 
     
    

Linux installation

  1. Open a terminal with root or sudo privileges.
  2. Run the following command:

     sudo  
    sh  
    -c  
     " 
     $( 
    curl  
    -fsSlL  
    https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh ) 
     " 
      
    install_unix.sh 
    

Additional installation resources

Configure the Bindplane agent to ingest Syslog and send to Google SecOps

  1. Access the configuration file:

    1. Locate the config.yaml file. Typically, it's in the /etc/bindplane-agent/ directory on Linux or in the installation directory on Windows.
    2. Open the file using a text editor (for example, nano , vi , or Notepad).
  2. Edit the config.yaml file as follows:

      receivers 
     : 
      
     udplog 
     : 
      
     # Replace the port and IP address as required 
      
     listen_address 
     : 
      
     "0.0.0.0:514" 
     exporters 
     : 
      
     chronicle/chronicle_w_labels 
     : 
      
     compression 
     : 
      
     gzip 
      
     # Adjust the path to the credentials file you downloaded in Step 1 
      
     creds_file_path 
     : 
      
     '/path/to/ingestion-authentication-file.json' 
      
     # Replace with your actual customer ID from Step 2 
      
     customer_id 
     : 
      
    < CUSTOMER_ID 
    >  
     endpoint 
     : 
      
     malachiteingestion-pa.googleapis.com 
      
     # Add optional ingestion labels for better organization 
      
     log_type 
     : 
      
     'CISCO_ACI' 
      
     raw_log_field 
     : 
      
     body 
      
     ingestion_labels 
     : 
     service 
     : 
      
     pipelines 
     : 
      
     logs/source0__chronicle_w_labels-0 
     : 
      
     receivers 
     : 
      
     - 
      
     udplog 
      
     exporters 
     : 
      
     - 
      
     chronicle/chronicle_w_labels 
     
    
    • Replace the port and IP address as required in your infrastructure.
    • Replace <customer_id> with the actual customer ID.
    • Update /path/to/ingestion-authentication-file.json to the path where the authentication file was saved in the Get Google SecOps ingestion authentication file section.

Restart the Bindplane agent to apply the changes

  • To restart the Bindplane agent in Linux, run the following command:

     sudo  
    systemctl  
    restart  
    bindplane-agent 
    
  • To restart the Bindplane agent in Windows, you can either use the Servicesconsole or enter the following command:

     net stop BindPlaneAgent && net start BindPlaneAgent 
    

Configure Syslog forwarding on Cisco ACI

Configure Out-of-Band Management Contract

  1. Sign in to the Cisco APICconsole.
  2. Go to Tenants > mgmt > Contracts > Filters.
  3. Click Create Filter.
  4. Provide the following configuration details:
    • Name: Enter syslog-udp-514 .
    • Entry Name: Enter syslog .
    • EtherType: Select IP.
    • IP Protocol: Select UDP.
    • Destination Port Range From: Enter 514 .
    • Destination Port Range To: Enter 514 .
  5. Click Submit.

Create Management Contract

  1. Go to Tenants > mgmt > Contracts > Standard.
  2. Click Create Contract.
  3. Provide the following configuration details:
    • Name: Enter mgmt-syslog-contract .
    • Scope: Select Context.
  4. Click Submit.
  5. Expand the contract and click Subjects.
  6. Click Create Contract Subject.
  7. Provide the following configuration details:
    • Name: Enter syslog-subject .
    • Apply Both Directions: Check this option.
  8. Click Submit.
  9. Expand the subject and click Filters.
  10. Click Create Filter Binding.
  11. Select the syslog-udp-514 filter created earlier.
  12. Click Submit.

Configure Syslog Destination Group

  1. Go to Admin > External Data Collectors > Monitoring Destinations > Syslog.
  2. Right-click Syslogand select Create Syslog Monitoring Destination Group.
  3. Provide the following configuration details:
    • Name: Enter Chronicle-Syslog-Group .
    • Admin State: Select Enabled.
    • Format: Select aci.
  4. Click Next.
  5. In the Create Syslog Monitoring Destinationdialog:
    • Name: Enter Chronicle-BindPlane .
    • Host: Enter the IP address of your Bindplane agent server.
    • Port: Enter 514 .
    • Admin State: Select Enabled.
    • Severity: Select information(to capture detailed logs).
  6. Click Submit.

Configure Monitoring Policies

Fabric Monitoring Policy

  1. Go to Fabric > Fabric Policies > Policies > Monitoring > Common Policy.
  2. Expand Callhome/Smart Callhome/SNMP/Syslog/TACACS.
  3. Right-click Syslogand select Create Syslog Source.
  4. Provide the following configuration details:
    • Name: Enter Chronicle-Fabric-Syslog .
    • Audit Logs: Check to include audit events.
    • Events: Check to include system events.
    • Faults: Check to include fault events.
    • Session Logs: Check to include session logs.
    • Destination Group: Select Chronicle-Syslog-Group .
  5. Click Submit.

Access Monitoring Policy

  1. Go to Fabric > Access Policies > Policies > Monitoring > Default Policy.
  2. Expand Callhome/Smart Callhome/SNMP/Syslog.
  3. Right-click Syslogand select Create Syslog Source.
  4. Provide the following configuration details:
    • Name: Enter Chronicle-Access-Syslog .
    • Audit Logs: Check to include audit events.
    • Events: Check to include system events.
    • Faults: Check to include fault events.
    • Session Logs: Check to include session logs.
    • Destination Group: Select Chronicle-Syslog-Group .
  5. Click Submit.

Configure System Syslog Messages Policy

  1. Go to Fabric > Fabric Policies > Policies > Monitoring > Common Policy.
  2. Expand Syslog Messages Policies.
  3. Click default.
  4. In the Facility Filtersection:
    • Facility: Select default.
    • Minimum Severity: Change to information.
  5. Click Submit.

Option 2: JSON via AWS S3

This option uses the APIC REST API to collect JSON-formatted events, faults, and audit logs from Cisco ACI fabric and stores them in AWS S3 for SecOps ingestion.

Before you begin

  • Google SecOps instance.
  • Privileged access to Cisco APICconsole.
  • Privileged access to AWS(S3, IAM, Lambda, EventBridge).

Collect Cisco ACI APIC prerequisites (IDs, API keys, org IDs, tokens)

  1. Sign in to the Cisco APICconsole using HTTPS.
  2. Go to Admin > AAA(on APIC 6.0+) or Admin > Authentication > AAA(on older releases).
    • Note: The AAA menu path changed starting in APIC 6.0(1).
  3. Create or use an existing local user with appropriate privileges.
  4. Copy and save in a secure location the following details:
    • APIC Username: Local user with read access to monitoring data
    • APIC Password: User password
    • APIC URL: The HTTPS URL of your APIC (for example, https://apic.example.com )

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucketfollowing this user guide: Creating a bucket .
  2. Save bucket Nameand Regionfor future reference (for example, cisco-aci-logs ).
  3. Create a Userfollowing this user guide: Creating an IAM user .
  4. Select the created User.
  5. Select Security credentialstab.
  6. Click Create Access Keyin section Access Keys.
  7. Select Third-party serviceas Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download CSV fileto save the Access Keyand Secret Access Keyfor future reference.
  12. Click Done.
  13. Select Permissionstab.
  14. Click Add permissionsin section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccesspolicy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies.
  2. Click Create policy > JSON tab.
  3. Enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Sid" 
     : 
      
     "AllowPutObjects" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:PutObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::cisco-aci-logs/*" 
      
     }, 
      
     { 
      
     "Sid" 
     : 
      
     "AllowGetStateObject" 
     , 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     "s3:GetObject" 
     , 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::cisco-aci-logs/cisco-aci-events/state.json" 
      
     } 
      
     ] 
     } 
     
    
    • Replace cisco-aci-logs if you entered a different bucket name.
  4. Click Next > Create policy.

  5. Go to IAM > Roles > Create role > AWS service > Lambda.

  6. Attach the newly created policy and the AWSLambdaBasicExecutionRolemanaged policy.

  7. Name the role cisco-aci-lambda-role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    • Name: cisco-aci-events-collector
    • Runtime: Python 3.13
    • Architecture: x86_64
    • Execution role: cisco-aci-lambda-role
  4. After the function is created, open the Codetab, delete the stub and enter the following code ( cisco-aci-events-collector.py ):

      import 
      
     json 
     import 
      
     boto3 
     import 
      
     urllib3 
     import 
      
     base64 
     from 
      
     datetime 
      
     import 
     datetime 
     , 
     timedelta 
     import 
      
     os 
     import 
      
     logging 
     # Configure logging 
     logger 
     = 
     logging 
     . 
     getLogger 
     () 
     logger 
     . 
     setLevel 
     ( 
     logging 
     . 
     INFO 
     ) 
     # AWS S3 client and HTTP pool manager 
     s3_client 
     = 
     boto3 
     . 
     client 
     ( 
     's3' 
     ) 
     http 
     = 
     urllib3 
     . 
     PoolManager 
     () 
     def 
      
     lambda_handler 
     ( 
     event 
     , 
     context 
     ): 
      
     """ 
     AWS Lambda handler to fetch Cisco ACI events, faults, and audit logs and store them in S3 
     """ 
     try 
     : 
     # Get environment variables 
     s3_bucket 
     = 
     os 
     . 
     environ 
     [ 
     'S3_BUCKET' 
     ] 
     s3_prefix 
     = 
     os 
     . 
     environ 
     [ 
     'S3_PREFIX' 
     ] 
     state_key 
     = 
     os 
     . 
     environ 
     [ 
     'STATE_KEY' 
     ] 
     apic_url 
     = 
     os 
     . 
     environ 
     [ 
     'APIC_URL' 
     ] 
     apic_username 
     = 
     os 
     . 
     environ 
     [ 
     'APIC_USERNAME' 
     ] 
     apic_password 
     = 
     os 
     . 
     environ 
     [ 
     'APIC_PASSWORD' 
     ] 
     # Optional parameters 
     page_size 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'PAGE_SIZE' 
     , 
     '100' 
     )) 
     max_pages 
     = 
     int 
     ( 
     os 
     . 
     environ 
     . 
     get 
     ( 
     'MAX_PAGES' 
     , 
     '10' 
     )) 
     logger 
     . 
     info 
     ( 
     f 
     "Starting Cisco ACI data collection for bucket: 
     { 
     s3_bucket 
     } 
     " 
     ) 
     # Get last run timestamp from state file 
     last_timestamp 
     = 
     get_last_timestamp 
     ( 
     s3_bucket 
     , 
     state_key 
     ) 
     if 
     not 
     last_timestamp 
     : 
     last_timestamp 
     = 
     ( 
     datetime 
     . 
     utcnow 
     () 
     - 
     timedelta 
     ( 
     hours 
     = 
     1 
     )) 
     . 
     isoformat 
     () 
     + 
     'Z' 
     # Authenticate to APIC 
     session_token 
     = 
     authenticate_apic 
     ( 
     apic_url 
     , 
     apic_username 
     , 
     apic_password 
     ) 
     headers 
     = 
     { 
     'Cookie' 
     : 
     f 
     'APIC-cookie= 
     { 
     session_token 
     } 
     ' 
     , 
     'Accept' 
     : 
     'application/json' 
     , 
     'Content-Type' 
     : 
     'application/json' 
     } 
     # Data types to collect 
     data_types 
     = 
     [ 
     'faultInst' 
     , 
     'eventRecord' 
     , 
     'aaaModLR' 
     ] 
     all_collected_data 
     = 
     [] 
     for 
     data_type 
     in 
     data_types 
     : 
     logger 
     . 
     info 
     ( 
     f 
     "Collecting 
     { 
     data_type 
     } 
     data" 
     ) 
     collected_data 
     = 
     collect_aci_data 
     ( 
     apic_url 
     , 
     headers 
     , 
     data_type 
     , 
     last_timestamp 
     , 
     page_size 
     , 
     max_pages 
     ) 
     # Tag each record with its type 
     for 
     record 
     in 
     collected_data 
     : 
     record 
     [ 
     '_data_type' 
     ] 
     = 
     data_type 
     all_collected_data 
     . 
     extend 
     ( 
     collected_data 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Collected 
     { 
     len 
     ( 
     collected_data 
     ) 
     } 
      
     { 
     data_type 
     } 
     records" 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Total records collected: 
     { 
     len 
     ( 
     all_collected_data 
     ) 
     } 
     " 
     ) 
     # Store data in S3 if any were collected 
     if 
     all_collected_data 
     : 
     timestamp_str 
     = 
     datetime 
     . 
     utcnow 
     () 
     . 
     strftime 
     ( 
     '%Y%m 
     %d 
     _%H%M%S' 
     ) 
     s3_key 
     = 
     f 
     " 
     { 
     s3_prefix 
     } 
     cisco_aci_events_ 
     { 
     timestamp_str 
     } 
     .ndjson" 
     # Convert to NDJSON format (one JSON object per line) 
     ndjson_content 
     = 
     ' 
     \n 
     ' 
     . 
     join 
     ( 
     json 
     . 
     dumps 
     ( 
     record 
     ) 
     for 
     record 
     in 
     all_collected_data 
     ) 
     # Upload to S3 
     s3_client 
     . 
     put_object 
     ( 
     Bucket 
     = 
     s3_bucket 
     , 
     Key 
     = 
     s3_key 
     , 
     Body 
     = 
     ndjson_content 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     ContentType 
     = 
     'application/x-ndjson' 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Uploaded 
     { 
     len 
     ( 
     all_collected_data 
     ) 
     } 
     records to s3:// 
     { 
     s3_bucket 
     } 
     / 
     { 
     s3_key 
     } 
     " 
     ) 
     # Update state file with latest timestamp from collected data 
     latest_timestamp 
     = 
     get_latest_timestamp_from_records 
     ( 
     all_collected_data 
     ) 
     if 
     not 
     latest_timestamp 
     : 
     latest_timestamp 
     = 
     datetime 
     . 
     utcnow 
     () 
     . 
     isoformat 
     () 
     + 
     'Z' 
     update_state 
     ( 
     s3_bucket 
     , 
     state_key 
     , 
     latest_timestamp 
     ) 
     return 
     { 
     'statusCode' 
     : 
     200 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ({ 
     'message' 
     : 
     'Success' 
     , 
     'total_records_collected' 
     : 
     len 
     ( 
     all_collected_data 
     ), 
     'data_types_collected' 
     : 
     data_types 
     }) 
     } 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     error 
     ( 
     f 
     "Error in lambda_handler: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     " 
     ) 
     return 
     { 
     'statusCode' 
     : 
     500 
     , 
     'body' 
     : 
     json 
     . 
     dumps 
     ({ 
     'error' 
     : 
     str 
     ( 
     e 
     ) 
     }) 
     } 
     def 
      
     authenticate_apic 
     ( 
     apic_url 
     , 
     username 
     , 
     password 
     ): 
      
     """ 
     Authenticate to APIC and return session token 
     """ 
     login_url 
     = 
     f 
     " 
     { 
     apic_url 
     } 
     /api/aaaLogin.json" 
     login_data 
     = 
     { 
     "aaaUser" 
     : 
     { 
     "attributes" 
     : 
     { 
     "name" 
     : 
     username 
     , 
     "pwd" 
     : 
     password 
     } 
     } 
     } 
     response 
     = 
     http 
     . 
     request 
     ( 
     'POST' 
     , 
     login_url 
     , 
     body 
     = 
     json 
     . 
     dumps 
     ( 
     login_data 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     headers 
     = 
     { 
     'Content-Type' 
     : 
     'application/json' 
     }, 
     timeout 
     = 
     30 
     ) 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     raise 
     RuntimeError 
     ( 
     f 
     "APIC authentication failed: 
     { 
     response 
     . 
     status 
     } 
      
     { 
     response 
     . 
     data 
     [: 
     256 
     ] 
     !r} 
     " 
     ) 
     response_data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     token 
     = 
     response_data 
     [ 
     'imdata' 
     ][ 
     0 
     ][ 
     'aaaLogin' 
     ][ 
     'attributes' 
     ][ 
     'token' 
     ] 
     logger 
     . 
     info 
     ( 
     "Successfully authenticated to APIC" 
     ) 
     return 
     token 
     def 
      
     collect_aci_data 
     ( 
     apic_url 
     , 
     headers 
     , 
     data_type 
     , 
     last_timestamp 
     , 
     page_size 
     , 
     max_pages 
     ): 
      
     """ 
     Collect data from APIC REST API with pagination 
     """ 
     all_data 
     = 
     [] 
     page 
     = 
     0 
     while 
     page 
    < max_pages 
     : 
     # Build API URL with pagination and time filters 
     api_url 
     = 
     f 
     " 
     { 
     apic_url 
     } 
     /api/class/ 
     { 
     data_type 
     } 
     .json" 
     params 
     = 
     [ 
     f 
     'page-size= 
     { 
     page_size 
     } 
     ' 
     , 
     f 
     'page= 
     { 
     page 
     } 
     ' 
     , 
     f 
     'order-by= 
     { 
     data_type 
     } 
     .created|asc' 
     ] 
     # Add time filter for all data types to prevent duplicates 
     time_attr 
     = 
     'created' 
     if 
     last_timestamp 
     : 
     params 
     . 
     append 
     ( 
     f 
     'query-target-filter=gt( 
     { 
     data_type 
     } 
     . 
     { 
     time_attr 
     } 
     ," 
     { 
     last_timestamp 
     } 
     ")' 
     ) 
     full_url 
     = 
     f 
     " 
     { 
     api_url 
     } 
     ? 
     { 
     '&' 
     . 
     join 
     ( 
     params 
     ) 
     } 
     " 
     logger 
     . 
     info 
     ( 
     f 
     "Fetching 
     { 
     data_type 
     } 
     page 
     { 
     page 
     } 
     from APIC" 
     ) 
     # Make API request 
     response 
     = 
     http 
     . 
     request 
     ( 
     'GET' 
     , 
     full_url 
     , 
     headers 
     = 
     headers 
     , 
     timeout 
     = 
     60 
     ) 
     if 
     response 
     . 
     status 
     != 
     200 
     : 
     logger 
     . 
     error 
     ( 
     f 
     "API request failed: 
     { 
     response 
     . 
     status 
     } 
      
     { 
     response 
     . 
     data 
     [: 
     256 
     ] 
     !r} 
     " 
     ) 
     break 
     data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     . 
     data 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     records 
     = 
     data 
     . 
     get 
     ( 
     'imdata' 
     , 
     []) 
     if 
     not 
     records 
     : 
     logger 
     . 
     info 
     ( 
     f 
     "No more 
     { 
     data_type 
     } 
     records found" 
     ) 
     break 
     # Extract the actual data from APIC format 
     extracted_records 
     = 
     [] 
     for 
     record 
     in 
     records 
     : 
     if 
     data_type 
     in 
     record 
     : 
     extracted_records 
     . 
     append 
     ( 
     record 
     [ 
     data_type 
     ]) 
     all_data 
     . 
     extend 
     ( 
     extracted_records 
     ) 
     page 
     += 
     1 
     # If we got less than page_size records, we've reached the end 
     if 
     len 
     ( 
     records 
     ) 
    < page_size 
     : 
     break 
     return 
     all_data 
     def 
      
     get_last_timestamp 
     ( 
     bucket 
     , 
     state_key 
     ): 
      
     """ 
     Get the last run timestamp from S3 state file 
     """ 
     try 
     : 
     response 
     = 
     s3_client 
     . 
     get_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     state_key 
     ) 
     state_data 
     = 
     json 
     . 
     loads 
     ( 
     response 
     [ 
     'Body' 
     ] 
     . 
     read 
     () 
     . 
     decode 
     ( 
     'utf-8' 
     )) 
     return 
     state_data 
     . 
     get 
     ( 
     'last_timestamp' 
     ) 
     except 
     s3_client 
     . 
     exceptions 
     . 
     NoSuchKey 
     : 
     logger 
     . 
     info 
     ( 
     "No state file found, starting from 1 hour ago" 
     ) 
     return 
     None 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     warning 
     ( 
     f 
     "Error reading state file: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     " 
     ) 
     return 
     None 
     def 
      
     get_latest_timestamp_from_records 
     ( 
     records 
     ): 
      
     """ 
     Get the latest timestamp from collected records to prevent missing events 
     """ 
     if 
     not 
     records 
     : 
     return 
     None 
     latest 
     = 
     None 
     latest_time 
     = 
     None 
     for 
     record 
     in 
     records 
     : 
     try 
     : 
     # Handle both direct attributes and nested structure 
     attrs 
     = 
     record 
     . 
     get 
     ( 
     'attributes' 
     , 
     record 
     ) 
     created 
     = 
     attrs 
     . 
     get 
     ( 
     'created' 
     ) 
     modTs 
     = 
     attrs 
     . 
     get 
     ( 
     'modTs' 
     ) 
     # Fallback for some object types 
     timestamp 
     = 
     created 
     or 
     modTs 
     if 
     timestamp 
     : 
     if 
     latest_time 
     is 
     None 
     or 
     timestamp 
    > latest_time 
     : 
     latest_time 
     = 
     timestamp 
     latest 
     = 
     record 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     debug 
     ( 
     f 
     "Error parsing timestamp from record: 
     { 
     e 
     } 
     " 
     ) 
     continue 
     return 
     latest_time 
     def 
      
     update_state 
     ( 
     bucket 
     , 
     state_key 
     , 
     timestamp 
     ): 
      
     """ 
     Update the state file with the current timestamp 
     """ 
     try 
     : 
     state_data 
     = 
     { 
     'last_timestamp' 
     : 
     timestamp 
     , 
     'updated_at' 
     : 
     datetime 
     . 
     utcnow 
     () 
     . 
     isoformat 
     () 
     + 
     'Z' 
     } 
     s3_client 
     . 
     put_object 
     ( 
     Bucket 
     = 
     bucket 
     , 
     Key 
     = 
     state_key 
     , 
     Body 
     = 
     json 
     . 
     dumps 
     ( 
     state_data 
     ) 
     . 
     encode 
     ( 
     'utf-8' 
     ), 
     ContentType 
     = 
     'application/json' 
     ) 
     logger 
     . 
     info 
     ( 
     f 
     "Updated state file with timestamp: 
     { 
     timestamp 
     } 
     " 
     ) 
     except 
     Exception 
     as 
     e 
     : 
     logger 
     . 
     error 
     ( 
     f 
     "Error updating state file: 
     { 
     str 
     ( 
     e 
     ) 
     } 
     " 
     ) 
     
    
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the following environment variables provided, replacing with your values.

    • S3_BUCKET: cisco-aci-logs
    • S3_PREFIX: cisco-aci-events/
    • STATE_KEY: cisco-aci-events/state.json
    • APIC_URL: https://apic.example.com
    • APIC_USERNAME: <your-apic-username>
    • APIC_PASSWORD: <your-apic-password>
    • PAGE_SIZE: 100 (optional, controls pagination size)
    • MAX_PAGES: 10 (optional, limits total pages fetched per run)
  8. After the function is created, stay on its page (or open Lambda > Functions > cisco-aci-events-collector).

  9. Select the Configurationtab.

  10. In the General configurationpanel click Edit.

  11. Change Timeoutto 5 minutes (300 seconds)and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate( 15 minutes ).
    • Target: your Lambda function cisco-aci-events-collector .
    • Name: cisco-aci-events-collector-15m .
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users > Add users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader .
    • Access type: Select Access key – Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

      { 
      
     "Version" 
     : 
      
     "2012-10-17" 
     , 
      
     "Statement" 
     : 
      
     [ 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:GetObject" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::cisco-aci-logs/*" 
      
     }, 
      
     { 
      
     "Effect" 
     : 
      
     "Allow" 
     , 
      
     "Action" 
     : 
      
     [ 
     "s3:ListBucket" 
     ], 
      
     "Resource" 
     : 
      
     "arn:aws:s3:::cisco-aci-logs" 
      
     } 
      
     ] 
     } 
     
    
  7. Set the name to secops-reader-policy .

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV(these values are entered into the feed).

Configure a feed in Google SecOps to ingest Cisco ACI logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed namefield, enter a name for the feed (for example, Cisco ACI JSON logs ).
  4. Select Amazon S3 V2as the Source type.
  5. Select Cisco Application Centric Infrastructureas the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://cisco-aci-logs/cisco-aci-events/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace .
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalizescreen, and then click Submit.

UDM Mapping Table

Log Field UDM Mapping Logic
@timestamp
read_only_udm.metadata.event_timestamp Value is taken from the raw log field '@timestamp' and parsed as a timestamp.
aci_tag
read_only_udm.metadata.product_log_id Value is taken from the raw log field 'aci_tag'.
cisco_timestamp
- Not mapped.
DIP
read_only_udm.target.ip Value is taken from the raw log field 'DIP'.
DPort
read_only_udm.target.port Value is taken from the raw log field 'DPort' and converted to integer.
description
read_only_udm.security_result.description Value is taken from the raw log field 'description'.
fault_cause
read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'fault_cause'. The key is set to 'Fault Cause'.
hostname
read_only_udm.principal.hostname Value is taken from the raw log field 'hostname'.
lifecycle_state
read_only_udm.metadata.product_event_type Value is taken from the raw log field 'lifecycle_state'.
log.source.address
- Not mapped.
logstash.collect.host
- Not mapped.
logstash.collect.timestamp
read_only_udm.metadata.collected_timestamp Value is taken from the raw log field 'logstash.collect.timestamp' and parsed as a timestamp.
logstash.ingest.host
read_only_udm.intermediary.hostname Value is taken from the raw log field 'logstash.ingest.host'.
logstash.irm_environment
read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'logstash.irm_environment'. The key is set to 'IRM_Environment'.
logstash.irm_region
read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'logstash.irm_region'. The key is set to 'IRM_Region'.
logstash.irm_site
read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'logstash.irm_site'. The key is set to 'IRM_Site'.
logstash.process.host
read_only_udm.intermediary.hostname Value is taken from the raw log field 'logstash.process.host'.
message
- Not mapped.
message_class
- Not mapped.
message_code
- Not mapped.
message_content
- Not mapped.
message_dn
- Not mapped.
message_type
read_only_udm.metadata.product_event_type Value is taken from the raw log field 'message_type' after removing square brackets.
node_link
read_only_udm.principal.process.file.full_path Value is taken from the raw log field 'node_link'.
PktLen
read_only_udm.network.received_bytes Value is taken from the raw log field 'PktLen' and converted to unsigned integer.
program
- Not mapped.
Proto
read_only_udm.network.ip_protocol Value is taken from the raw log field 'Proto', converted to integer, and mapped to the corresponding IP protocol name (e.g., 6 -> TCP).
SIP
read_only_udm.principal.ip Value is taken from the raw log field 'SIP'.
SPort
read_only_udm.principal.port Value is taken from the raw log field 'SPort' and converted to integer.
syslog_facility
- Not mapped.
syslog_facility_code
- Not mapped.
syslog_host
read_only_udm.principal.ip, read_only_udm.observer.ip Value is taken from the raw log field 'syslog_host'.
syslog_prog
- Not mapped.
syslog_severity
read_only_udm.security_result.severity_details Value is taken from the raw log field 'syslog_severity'.
syslog_severity_code
read_only_udm.security_result.severity Value is taken from the raw log field 'syslog_severity_code' and mapped to the corresponding severity level: 5, 6, 7 -> INFORMATIONAL; 3, 4 -> MEDIUM; 0, 1, 2 -> HIGH.
syslog5424_pri
- Not mapped.
Vlan-Id
read_only_udm.principal.resource.id Value is taken from the raw log field 'Vlan-Id'.
-
read_only_udm.metadata.event_type Logic: If 'SIP' or 'hostname' is present and 'Proto' is present, set to 'NETWORK_CONNECTION'. Else if 'SIP', 'hostname', or 'syslog_host' is present, set to 'STATUS_UPDATE'. Otherwise, set to 'GENERIC_EVENT'.
-
read_only_udm.metadata.log_type Logic: Set to 'CISCO_ACI'.
-
read_only_udm.metadata.vendor_name Logic: Set to 'Cisco'.
-
read_only_udm.metadata.product_name Logic: Set to 'ACI'.

Need more help? Get answers from Community members and Google SecOps professionals.

Create a Mobile Website
View Site in Mobile | Classic
Share by: