Collect Cisco Application Centric Infrastructure (ACI) logs

Supported in:

This document explains how to ingest Cisco Application Centric Infrastructure (ACI) logs to Google Security Operations. The parser first attempts to process incoming Cisco ACI logs as syslog messages using Grok patterns. If the syslog parsing fails, it assumes the message is in JSON format and parses it accordingly. Finally, it maps the extracted fields to the unified data model (UDM).

This integration supports two methods:

  • Option 1: Syslog format via Bindplane agent
  • Option 2: JSON format via AWS S3 using APIC REST API

Each option is self-contained and can be implemented independently based on your infrastructure requirements and log format preferences.

Option 1: Syslog via Bindplane agent

This option configures Cisco ACI fabric to send syslog messages to a Bindplane agent, which forwards them to Chronicle for analysis.

Before you begin

Make sure you have the following prerequisites:

  • A Google SecOps instance
  • A Windows 2016 or later or Linux host with systemd
  • If running behind a proxy, ensure firewall ports are open per the Bindplane agent requirements
  • Privileged access to the Cisco APIC console

Get Google SecOps ingestion authentication file

  1. Sign in to the Google SecOps console.
  2. Go to SIEM Settings > Collection Agents.
  3. Download the Ingestion Authentication File. Save the file securely on the system where Bindplane will be installed.

Get Google SecOps customer ID

  1. Sign in to the Google SecOps console.
  2. Go to SIEM Settings > Profile.
  3. Copy and save the Customer ID from the Organization Details section.

Install the Bindplane agent

Install the Bindplane agent on your Windows or Linux operating system according to the following instructions.

Windows installation

  1. Open the Command Prompt or PowerShell as an administrator.
  2. Run the following command:

    msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet 

Linux installation

  1. Open a terminal with root or sudo privileges.
  2. Run the following command:

    sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh 

Additional installation resources

Configure the Bindplane agent to ingest Syslog and send to Google SecOps

  1. Access the configuration file:

    1. Locate the config.yaml file. Typically, it's in the /etc/bindplane-agent/ directory on Linux or in the installation directory on Windows.
    2. Open the file using a text editor (for example, nano, vi, or Notepad).
  2. Edit the config.yaml file as follows:

    receivers:  udplog:  # Replace the port and IP address as required  listen_address: "0.0.0.0:514" exporters:  chronicle/chronicle_w_labels:  compression: gzip  # Adjust the path to the credentials file you downloaded in Step 1  creds_file_path: '/path/to/ingestion-authentication-file.json'  # Replace with your actual customer ID from Step 2  customer_id: <CUSTOMER_ID>  endpoint: malachiteingestion-pa.googleapis.com  # Add optional ingestion labels for better organization  log_type: 'CISCO_ACI'  raw_log_field: body  ingestion_labels: service:  pipelines:  logs/source0__chronicle_w_labels-0:  receivers:  - udplog  exporters:  - chronicle/chronicle_w_labels 
    • Replace the port and IP address as required in your infrastructure.
    • Replace <customer_id> with the actual customer ID.
    • Update /path/to/ingestion-authentication-file.json to the path where the authentication file was saved in the Get Google SecOps ingestion authentication file section.

Restart the Bindplane agent to apply the changes

  • To restart the Bindplane agent in Linux, run the following command:

    sudo systemctl restart bindplane-agent 
  • To restart the Bindplane agent in Windows, you can either use the Services console or enter the following command:

    net stop BindPlaneAgent && net start BindPlaneAgent 

Configure Syslog forwarding on Cisco ACI

Configure Out-of-Band Management Contract

  1. Sign in to the Cisco APIC console.
  2. Go to Tenants > mgmt > Contracts > Filters.
  3. Click Create Filter.
  4. Provide the following configuration details:
    • Name: Enter syslog-udp-514.
    • Entry Name: Enter syslog.
    • EtherType: Select IP.
    • IP Protocol: Select UDP.
    • Destination Port Range From: Enter 514.
    • Destination Port Range To: Enter 514.
  5. Click Submit.

Create Management Contract

  1. Go to Tenants > mgmt > Contracts > Standard.
  2. Click Create Contract.
  3. Provide the following configuration details:
    • Name: Enter mgmt-syslog-contract.
    • Scope: Select Context.
  4. Click Submit.
  5. Expand the contract and click Subjects.
  6. Click Create Contract Subject.
  7. Provide the following configuration details:
    • Name: Enter syslog-subject.
    • Apply Both Directions: Check this option.
  8. Click Submit.
  9. Expand the subject and click Filters.
  10. Click Create Filter Binding.
  11. Select the syslog-udp-514 filter created earlier.
  12. Click Submit.

Configure Syslog Destination Group

  1. Go to Admin > External Data Collectors > Monitoring Destinations > Syslog.
  2. Right-click Syslog and select Create Syslog Monitoring Destination Group.
  3. Provide the following configuration details:
    • Name: Enter Chronicle-Syslog-Group.
    • Admin State: Select Enabled.
    • Format: Select aci.
  4. Click Next.
  5. In the Create Syslog Monitoring Destination dialog:
    • Name: Enter Chronicle-BindPlane.
    • Host: Enter the IP address of your Bindplane agent server.
    • Port: Enter 514.
    • Admin State: Select Enabled.
    • Severity: Select information (to capture detailed logs).
  6. Click Submit.

Configure Monitoring Policies

Fabric Monitoring Policy

  1. Go to Fabric > Fabric Policies > Policies > Monitoring > Common Policy.
  2. Expand Callhome/Smart Callhome/SNMP/Syslog/TACACS.
  3. Right-click Syslog and select Create Syslog Source.
  4. Provide the following configuration details:
    • Name: Enter Chronicle-Fabric-Syslog.
    • Audit Logs: Check to include audit events.
    • Events: Check to include system events.
    • Faults: Check to include fault events.
    • Session Logs: Check to include session logs.
    • Destination Group: Select Chronicle-Syslog-Group.
  5. Click Submit.

Access Monitoring Policy

  1. Go to Fabric > Access Policies > Policies > Monitoring > Default Policy.
  2. Expand Callhome/Smart Callhome/SNMP/Syslog.
  3. Right-click Syslog and select Create Syslog Source.
  4. Provide the following configuration details:
    • Name: Enter Chronicle-Access-Syslog.
    • Audit Logs: Check to include audit events.
    • Events: Check to include system events.
    • Faults: Check to include fault events.
    • Session Logs: Check to include session logs.
    • Destination Group: Select Chronicle-Syslog-Group.
  5. Click Submit.

Configure System Syslog Messages Policy

  1. Go to Fabric > Fabric Policies > Policies > Monitoring > Common Policy.
  2. Expand Syslog Messages Policies.
  3. Click default.
  4. In the Facility Filter section:
    • Facility: Select default.
    • Minimum Severity: Change to information.
  5. Click Submit.

Option 2: JSON via AWS S3

This option uses the APIC REST API to collect JSON-formatted events, faults, and audit logs from Cisco ACI fabric and stores them in AWS S3 for SecOps ingestion.

Before you begin

  • Google SecOps instance.
  • Privileged access to Cisco APIC console.
  • Privileged access to AWS (S3, IAM, Lambda, EventBridge).

Collect Cisco ACI APIC prerequisites (IDs, API keys, org IDs, tokens)

  1. Sign in to the Cisco APIC console using HTTPS.
  2. Go to Admin > AAA (on APIC 6.0+) or Admin > Authentication > AAA (on older releases).
    • Note: The AAA menu path changed starting in APIC 6.0(1).
  3. Create or use an existing local user with appropriate privileges.
  4. Copy and save in a secure location the following details:
    • APIC Username: Local user with read access to monitoring data
    • APIC Password: User password
    • APIC URL: The HTTPS URL of your APIC (for example, https://apic.example.com)

Configure AWS S3 bucket and IAM for Google SecOps

  1. Create Amazon S3 bucket following this user guide: Creating a bucket.
  2. Save bucket Name and Region for future reference (for example, cisco-aci-logs).
  3. Create a User following this user guide: Creating an IAM user.
  4. Select the created User.
  5. Select Security credentials tab.
  6. Click Create Access Key in section Access Keys.
  7. Select Third-party service as Use case.
  8. Click Next.
  9. Optional: Add a description tag.
  10. Click Create access key.
  11. Click Download CSV file to save the Access Key and Secret Access Key for future reference.
  12. Click Done.
  13. Select Permissions tab.
  14. Click Add permissions in section Permissions policies.
  15. Select Add permissions.
  16. Select Attach policies directly.
  17. Search for AmazonS3FullAccess policy.
  18. Select the policy.
  19. Click Next.
  20. Click Add permissions.

Configure the IAM policy and role for S3 uploads

  1. In the AWS console, go to IAM > Policies.
  2. Click Create policy > JSON tab.
  3. Enter the following policy:

    {  "Version": "2012-10-17",  "Statement": [  {  "Sid": "AllowPutObjects",  "Effect": "Allow",  "Action": "s3:PutObject",  "Resource": "arn:aws:s3:::cisco-aci-logs/*"  },  {  "Sid": "AllowGetStateObject",  "Effect": "Allow",  "Action": "s3:GetObject",  "Resource": "arn:aws:s3:::cisco-aci-logs/cisco-aci-events/state.json"  }  ] } 
    • Replace cisco-aci-logs if you entered a different bucket name.
  4. Click Next > Create policy.

  5. Go to IAM > Roles > Create role > AWS service > Lambda.

  6. Attach the newly created policy and the AWSLambdaBasicExecutionRole managed policy.

  7. Name the role cisco-aci-lambda-role and click Create role.

Create the Lambda function

  1. In the AWS Console, go to Lambda > Functions > Create function.
  2. Click Author from scratch.
  3. Provide the following configuration details:

    • Name: cisco-aci-events-collector
    • Runtime: Python 3.13
    • Architecture: x86_64
    • Execution role: cisco-aci-lambda-role
  4. After the function is created, open the Code tab, delete the stub and enter the following code (cisco-aci-events-collector.py):

    import json import boto3 import urllib3 import base64 from datetime import datetime, timedelta import os import logging # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client and HTTP pool manager s3_client = boto3.client('s3') http = urllib3.PoolManager() def lambda_handler(event, context):  """  AWS Lambda handler to fetch Cisco ACI events, faults, and audit logs and store them in S3  """ try: # Get environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] apic_url = os.environ['APIC_URL'] apic_username = os.environ['APIC_USERNAME'] apic_password = os.environ['APIC_PASSWORD'] # Optional parameters page_size = int(os.environ.get('PAGE_SIZE', '100')) max_pages = int(os.environ.get('MAX_PAGES', '10')) logger.info(f"Starting Cisco ACI data collection for bucket: {s3_bucket}") # Get last run timestamp from state file last_timestamp = get_last_timestamp(s3_bucket, state_key) if not last_timestamp: last_timestamp = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z' # Authenticate to APIC session_token = authenticate_apic(apic_url, apic_username, apic_password) headers = { 'Cookie': f'APIC-cookie={session_token}', 'Accept': 'application/json', 'Content-Type': 'application/json' } # Data types to collect data_types = ['faultInst', 'eventRecord', 'aaaModLR'] all_collected_data = [] for data_type in data_types: logger.info(f"Collecting {data_type} data") collected_data = collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages) # Tag each record with its type for record in collected_data: record['_data_type'] = data_type all_collected_data.extend(collected_data) logger.info(f"Collected {len(collected_data)} {data_type} records") logger.info(f"Total records collected: {len(all_collected_data)}") # Store data in S3 if any were collected if all_collected_data: timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S') s3_key = f"{s3_prefix}cisco_aci_events_{timestamp_str}.ndjson" # Convert to NDJSON format (one JSON object per line) ndjson_content = '\n'.join(json.dumps(record) for record in all_collected_data) # Upload to S3 s3_client.put_object( Bucket=s3_bucket, Key=s3_key, Body=ndjson_content.encode('utf-8'), ContentType='application/x-ndjson' ) logger.info(f"Uploaded {len(all_collected_data)} records to s3://{s3_bucket}/{s3_key}") # Update state file with latest timestamp from collected data latest_timestamp = get_latest_timestamp_from_records(all_collected_data) if not latest_timestamp: latest_timestamp = datetime.utcnow().isoformat() + 'Z' update_state(s3_bucket, state_key, latest_timestamp) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Success', 'total_records_collected': len(all_collected_data), 'data_types_collected': data_types }) } except Exception as e: logger.error(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) } def authenticate_apic(apic_url, username, password):  """  Authenticate to APIC and return session token  """ login_url = f"{apic_url}/api/aaaLogin.json" login_data = { "aaaUser": { "attributes": { "name": username, "pwd": password } } } response = http.request( 'POST', login_url, body=json.dumps(login_data).encode('utf-8'), headers={'Content-Type': 'application/json'}, timeout=30 ) if response.status != 200: raise RuntimeError(f"APIC authentication failed: {response.status} {response.data[:256]!r}") response_data = json.loads(response.data.decode('utf-8')) token = response_data['imdata'][0]['aaaLogin']['attributes']['token'] logger.info("Successfully authenticated to APIC") return token def collect_aci_data(apic_url, headers, data_type, last_timestamp, page_size, max_pages):  """  Collect data from APIC REST API with pagination  """ all_data = [] page = 0 while page < max_pages: # Build API URL with pagination and time filters api_url = f"{apic_url}/api/class/{data_type}.json" params = [ f'page-size={page_size}', f'page={page}', f'order-by={data_type}.created|asc' ] # Add time filter for all data types to prevent duplicates time_attr = 'created' if last_timestamp: params.append(f'query-target-filter=gt({data_type}.{time_attr},"{last_timestamp}")') full_url = f"{api_url}?{'&'.join(params)}" logger.info(f"Fetching {data_type} page {page} from APIC") # Make API request response = http.request('GET', full_url, headers=headers, timeout=60) if response.status != 200: logger.error(f"API request failed: {response.status} {response.data[:256]!r}") break data = json.loads(response.data.decode('utf-8')) records = data.get('imdata', []) if not records: logger.info(f"No more {data_type} records found") break # Extract the actual data from APIC format extracted_records = [] for record in records: if data_type in record: extracted_records.append(record[data_type]) all_data.extend(extracted_records) page += 1 # If we got less than page_size records, we've reached the end if len(records) < page_size: break return all_data def get_last_timestamp(bucket, state_key):  """  Get the last run timestamp from S3 state file  """ try: response = s3_client.get_object(Bucket=bucket, Key=state_key) state_data = json.loads(response['Body'].read().decode('utf-8')) return state_data.get('last_timestamp') except s3_client.exceptions.NoSuchKey: logger.info("No state file found, starting from 1 hour ago") return None except Exception as e: logger.warning(f"Error reading state file: {str(e)}") return None def get_latest_timestamp_from_records(records):  """  Get the latest timestamp from collected records to prevent missing events  """ if not records: return None latest = None latest_time = None for record in records: try: # Handle both direct attributes and nested structure attrs = record.get('attributes', record) created = attrs.get('created') modTs = attrs.get('modTs') # Fallback for some object types timestamp = created or modTs if timestamp: if latest_time is None or timestamp > latest_time: latest_time = timestamp latest = record except Exception as e: logger.debug(f"Error parsing timestamp from record: {e}") continue return latest_time def update_state(bucket, state_key, timestamp):  """  Update the state file with the current timestamp  """ try: state_data = { 'last_timestamp': timestamp, 'updated_at': datetime.utcnow().isoformat() + 'Z' } s3_client.put_object( Bucket=bucket, Key=state_key, Body=json.dumps(state_data).encode('utf-8'), ContentType='application/json' ) logger.info(f"Updated state file with timestamp: {timestamp}") except Exception as e: logger.error(f"Error updating state file: {str(e)}") 
  5. Go to Configuration > Environment variables.

  6. Click Edit > Add new environment variable.

  7. Enter the following environment variables provided, replacing with your values.

    • S3_BUCKET: cisco-aci-logs
    • S3_PREFIX: cisco-aci-events/
    • STATE_KEY: cisco-aci-events/state.json
    • APIC_URL: https://apic.example.com
    • APIC_USERNAME: <your-apic-username>
    • APIC_PASSWORD: <your-apic-password>
    • PAGE_SIZE: 100 (optional, controls pagination size)
    • MAX_PAGES: 10 (optional, limits total pages fetched per run)
  8. After the function is created, stay on its page (or open Lambda > Functions > cisco-aci-events-collector).

  9. Select the Configuration tab.

  10. In the General configuration panel click Edit.

  11. Change Timeout to 5 minutes (300 seconds) and click Save.

Create an EventBridge schedule

  1. Go to Amazon EventBridge > Scheduler > Create schedule.
  2. Provide the following configuration details:
    • Recurring schedule: Rate (15 minutes).
    • Target: your Lambda function cisco-aci-events-collector.
    • Name: cisco-aci-events-collector-15m.
  3. Click Create schedule.

Optional: Create read-only IAM user & keys for Google SecOps

  1. Go to AWS Console > IAM > Users > Add users.
  2. Click Add users.
  3. Provide the following configuration details:
    • User: Enter secops-reader.
    • Access type: Select Access key – Programmatic access.
  4. Click Create user.
  5. Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. In the JSON editor, enter the following policy:

    {  "Version": "2012-10-17",  "Statement": [  {  "Effect": "Allow",  "Action": ["s3:GetObject"],  "Resource": "arn:aws:s3:::cisco-aci-logs/*"  },  {  "Effect": "Allow",  "Action": ["s3:ListBucket"],  "Resource": "arn:aws:s3:::cisco-aci-logs"  }  ] } 
  7. Set the name to secops-reader-policy.

  8. Go to Create policy > search/select > Next > Add permissions.

  9. Go to Security credentials > Access keys > Create access key.

  10. Download the CSV (these values are entered into the feed).

Configure a feed in Google SecOps to ingest Cisco ACI logs

  1. Go to SIEM Settings > Feeds.
  2. Click + Add New Feed.
  3. In the Feed name field, enter a name for the feed (for example, Cisco ACI JSON logs).
  4. Select Amazon S3 V2 as the Source type.
  5. Select Cisco Application Centric Infrastructure as the Log type.
  6. Click Next.
  7. Specify values for the following input parameters:
    • S3 URI: s3://cisco-aci-logs/cisco-aci-events/
    • Source deletion options: Select deletion option according to your preference.
    • Maximum File Age: Include files modified in the last number of days. Default is 180 days.
    • Access Key ID: User access key with access to the S3 bucket.
    • Secret Access Key: User secret key with access to the S3 bucket.
    • Asset namespace: The asset namespace.
    • Ingestion labels: The label applied to the events from this feed.
  8. Click Next.
  9. Review your new feed configuration in the Finalize screen, and then click Submit.

UDM Mapping Table

Log Field UDM Mapping Logic
@timestamp read_only_udm.metadata.event_timestamp Value is taken from the raw log field '@timestamp' and parsed as a timestamp.
aci_tag read_only_udm.metadata.product_log_id Value is taken from the raw log field 'aci_tag'.
cisco_timestamp - Not mapped.
DIP read_only_udm.target.ip Value is taken from the raw log field 'DIP'.
DPort read_only_udm.target.port Value is taken from the raw log field 'DPort' and converted to integer.
description read_only_udm.security_result.description Value is taken from the raw log field 'description'.
fault_cause read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'fault_cause'. The key is set to 'Fault Cause'.
hostname read_only_udm.principal.hostname Value is taken from the raw log field 'hostname'.
lifecycle_state read_only_udm.metadata.product_event_type Value is taken from the raw log field 'lifecycle_state'.
log.source.address - Not mapped.
logstash.collect.host - Not mapped.
logstash.collect.timestamp read_only_udm.metadata.collected_timestamp Value is taken from the raw log field 'logstash.collect.timestamp' and parsed as a timestamp.
logstash.ingest.host read_only_udm.intermediary.hostname Value is taken from the raw log field 'logstash.ingest.host'.
logstash.irm_environment read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'logstash.irm_environment'. The key is set to 'IRM_Environment'.
logstash.irm_region read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'logstash.irm_region'. The key is set to 'IRM_Region'.
logstash.irm_site read_only_udm.additional.fields.value.string_value Value is taken from the raw log field 'logstash.irm_site'. The key is set to 'IRM_Site'.
logstash.process.host read_only_udm.intermediary.hostname Value is taken from the raw log field 'logstash.process.host'.
message - Not mapped.
message_class - Not mapped.
message_code - Not mapped.
message_content - Not mapped.
message_dn - Not mapped.
message_type read_only_udm.metadata.product_event_type Value is taken from the raw log field 'message_type' after removing square brackets.
node_link read_only_udm.principal.process.file.full_path Value is taken from the raw log field 'node_link'.
PktLen read_only_udm.network.received_bytes Value is taken from the raw log field 'PktLen' and converted to unsigned integer.
program - Not mapped.
Proto read_only_udm.network.ip_protocol Value is taken from the raw log field 'Proto', converted to integer, and mapped to the corresponding IP protocol name (e.g., 6 -> TCP).
SIP read_only_udm.principal.ip Value is taken from the raw log field 'SIP'.
SPort read_only_udm.principal.port Value is taken from the raw log field 'SPort' and converted to integer.
syslog_facility - Not mapped.
syslog_facility_code - Not mapped.
syslog_host read_only_udm.principal.ip, read_only_udm.observer.ip Value is taken from the raw log field 'syslog_host'.
syslog_prog - Not mapped.
syslog_severity read_only_udm.security_result.severity_details Value is taken from the raw log field 'syslog_severity'.
syslog_severity_code read_only_udm.security_result.severity Value is taken from the raw log field 'syslog_severity_code' and mapped to the corresponding severity level: 5, 6, 7 -> INFORMATIONAL; 3, 4 -> MEDIUM; 0, 1, 2 -> HIGH.
syslog5424_pri - Not mapped.
Vlan-Id read_only_udm.principal.resource.id Value is taken from the raw log field 'Vlan-Id'.
- read_only_udm.metadata.event_type Logic: If 'SIP' or 'hostname' is present and 'Proto' is present, set to 'NETWORK_CONNECTION'. Else if 'SIP', 'hostname', or 'syslog_host' is present, set to 'STATUS_UPDATE'. Otherwise, set to 'GENERIC_EVENT'.
- read_only_udm.metadata.log_type Logic: Set to 'CISCO_ACI'.
- read_only_udm.metadata.vendor_name Logic: Set to 'Cisco'.
- read_only_udm.metadata.product_name Logic: Set to 'ACI'.

Need more help? Get answers from Community members and Google SecOps professionals.