Collect Elasticsearch logs
This document explains how to ingest Elasticsearch logs to Google Security Operations using Amazon S3. The parser transforms raw JSON formatted logs into a unified data model (UDM). It extracts fields from nested JSON structures, maps them to UDM fields, and enriches the data with security-relevant context like severity levels and user roles.
Before you begin
- A Google SecOps instance
- Privileged access to Elasticsearchcluster administration
- Privileged access to AWS(S3, IAM, EC2)
- EC2 instance or persistent host to run Logstash
Get Elasticsearch prerequisites
- Sign in to your Elasticsearch clusteras an administrator.
- Verify that your Elasticsearch subscription includes Security features(required for audit logging).
- Note your Elasticsearch cluster name and version for reference.
- Identify the path where audit logs will be written (default:
$ES_HOME/logs/<clustername>_audit.json).
Enable Elasticsearch audit logging
- On each Elasticsearch node, edit the elasticsearch.ymlconfiguration file.
-
Add the following setting:
xpack.security.audit.enabled : true -
Perform a rolling restartof the cluster to apply changes:
- Disable shard allocation:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": "primaries"}} - Stop and restart each node one at a time.
- Re-enable shard allocation:
PUT _cluster/settings {"persistent": {"cluster.routing.allocation.enable": null}}
- Disable shard allocation:
-
Verify audit logs are being generated at
<clustername>_audit.jsonin the logs directory.
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucketfollowing this user guide: Creating a bucket
- Save bucket Nameand Regionfor future reference (for example,
elastic-search-logs). - Create a Userfollowing this user guide: Creating an IAM user .
- Select the created User.
- Select Security credentialstab.
- Click Create Access Keyin section Access Keys.
- Select Third-party serviceas Use case.
- Click Next.
- Optional: Add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor future reference.
- Click Done.
- Select Permissionstab.
- Click Add permissionsin section Permissions policies.
- Select Add permissions.
- Select Attach policies directly.
- Search for AmazonS3FullAccesspolicy.
- Select the policy.
- Click Next.
- Click Add permissions.
Configure Logstash to ship audit logs to S3
- Install Logstashon an EC2 instance or persistent host that can access the Elasticsearch audit log files.
-
Install the S3 output plugin if not already present:
bin/logstash-plugin install logstash-output-s3 -
Create a Logstash configuration file (
elastic-to-s3.conf):input { file { path = > "/path/to/elasticsearch/logs/*_audit.json" start_position = > "beginning" codec = > "json" # audit file: 1 JSON object per line sincedb_path = > "/var/lib/logstash/sincedb_elastic_search" exclude = > [ "*.gz" ] } } filter { # Intentionally minimal: do NOT reshape audit JSON the ELASTIC_SEARCH parser expects. # If you must add metadata for ops, put it under [@metadata] so it won't be written. # ruby { code => "event.set('[@metadata][ingested_at]', Time.now.utc.iso8601)" } } output { s3 { access_key_id = > "YOUR_AWS_ACCESS_KEY" secret_access_key = > "YOUR_AWS_SECRET_KEY" region = > "us-east-1" bucket = > "elastic-search-logs" prefix = > "logs/%{+YYYY}/%{+MM}/%{+dd}/" codec = > "json_lines" # NDJSON output (1 JSON object per line) encoding = > "gzip" # compress objects server_side_encryption = > true # Optionally for KMS: # server_side_encryption_kms_key_id => "arn:aws:kms:REGION:ACCT:key/KEY_ID" size_file = > 104857600 # 100MB rotation time_file = > 300 # 5 min rotation } } -
Start Logstash with the configuration:
bin/logstash -f elastic-to-s3.conf
Optional: Create read-only IAM user for Google SecOps
- Go to AWS Console > IAM > Users > Add users.
- Click Add users.
- Provide the following configuration details:
- User: Enter
secops-reader. - Access type: Select Access key – Programmatic access.
- User: Enter
- Click Create user.
- Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
-
In the JSON editor, enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::elastic-search-logs/*" }, { "Effect" : "Allow" , "Action" : [ "s3:ListBucket" ], "Resource" : "arn:aws:s3:::elastic-search-logs" } ] } -
Set the name to
secops-reader-policy. -
Go to Create policy > search/select > Next > Add permissions.
-
Go to Security credentials > Access keys > Create access key.
-
Download the CSV(these values are entered into the feed).
Configure a feed in Google SecOps to ingest Elasticsearch logs
- Go to SIEM Settings > Feeds.
- Click + Add New Feed.
- In the Feed namefield, enter a name for the feed (for example,
Elasticsearch Logs). - Select Amazon S3 V2as the Source type.
- Select Elastic Searchas the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://elastic-search-logs/logs/ - Source deletion options: Select deletion option according to your preference.
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: The asset namespace .
- Ingestion labels: The label applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalizescreen, and then click Submit.
UDM Mapping Table
| Log field | UDM mapping | Logic |
|---|---|---|
|
Level
|
security_result.severity | The logic checks the value of the "Level" field and maps it to the corresponding UDM severity level: - "INFO", "ALL", "OFF", "TRACE", "DEBUG" are mapped to "INFORMATIONAL". - "WARN" is mapped to "LOW". - "ERROR" is mapped to "ERROR". - "FATAL" is mapped to "CRITICAL". |
|
message.@timestamp
|
timestamp | The timestamp is parsed from the "@timestamp" field within the "message" field of the raw log, using the format "yyyy-MM-ddTHH:mm:ss.SSS". |
|
message.action
|
security_result.action_details | Value is taken from the "action" field within the "message" field of the raw log. |
|
message.event.action
|
security_result.summary | Value is taken from the "event.action" field within the "message" field of the raw log. |
|
message.event.type
|
metadata.product_event_type | Value is taken from the "event.type" field within the "message" field of the raw log. |
|
message.host.ip
|
target.ip | Value is taken from the "host.ip" field within the "message" field of the raw log. |
|
message.host.name
|
target.hostname | Value is taken from the "host.name" field within the "message" field of the raw log. |
|
message.indices
|
target.labels.value | Value is taken from the "indices" field within the "message" field of the raw log. |
|
message.mrId
|
target.hostname | Value is taken from the "mrId" field within the "message" field of the raw log. |
|
message.node.id
|
principal.asset.product_object_id | Value is taken from the "node.id" field within the "message" field of the raw log. |
|
message.node.name
|
target.asset.hostname | Value is taken from the "node.name" field within the "message" field of the raw log. |
|
message.origin.address
|
principal.ip | The IP address is extracted from the "origin.address" field within the "message" field of the raw log, by removing the port number. |
|
message.origin.type
|
principal.resource.resource_subtype | Value is taken from the "origin.type" field within the "message" field of the raw log. |
|
message.properties.host_group
|
principal.hostname | Value is taken from the "properties.host_group" field within the "message" field of the raw log. |
|
message.properties.host_group
|
target.group.group_display_name | Value is taken from the "properties.host_group" field within the "message" field of the raw log. |
|
message.request.id
|
target.resource.product_object_id | Value is taken from the "request.id" field within the "message" field of the raw log. |
|
message.request.name
|
target.resource.name | Value is taken from the "request.name" field within the "message" field of the raw log. |
|
message.user.name
|
principal.user.userid | Value is taken from the "user.name" field within the "message" field of the raw log. |
|
message.user.realm
|
principal.user.attribute.permissions.name | Value is taken from the "user.realm" field within the "message" field of the raw log. |
|
message.user.roles
|
about.user.attribute.roles.name | Value is taken from the "user.roles" field within the "message" field of the raw log. |
| |
metadata.event_type | Hardcoded value: "USER_RESOURCE_ACCESS" |
| |
metadata.log_type | Hardcoded value: "ELASTIC_SEARCH" |
| |
metadata.product_name | Hardcoded value: "ELASTICSEARCH" |
| |
metadata.vendor_name | Hardcoded value: "ELASTIC" |
| |
principal.port | The port number is extracted from the "origin.address" field within the "message" field of the raw log. |
| |
target.labels.key | Hardcoded value: "Indice" |
Need more help? Get answers from Community members and Google SecOps professionals.

