Collect Censys logs
This document explains how to ingest Censys logs to Google Security Operations using Amazon S3. Censys provides comprehensive attack surface management and internet intelligence through its API. This integration lets you collect host discovery events, risk events, and asset changes from Censys ASM and forward them to Google SecOps for analysis and monitoring. The parser transforms raw logs into a structured format conforming to the Google SecOps UDM. It extracts fields from the raw log message, performs data type conversions, and maps the extracted information to corresponding UDM fields, enriching the data with additional context and labels.
Before you begin
Make sure you have the following prerequisites:
- Google SecOps instance
- Privileged access to Censys ASM
- Privileged access to AWS(S3, IAM, Lambda, EventBridge)
Collect Censys prerequisites (API credentials)
- Sign in to the Censys ASM Consoleat
app.censys.io
. - Go to Integrationsat the top of the page.
- Copy and save your API Keyand Organization ID.
- Note the API Base URL:
https://api.platform.censys.io
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucketfollowing this user guide: Creating a bucket
- Save bucket Nameand Regionfor future reference (for example,
censys-logs
). - Create a user following this user guide: Creating an IAM user .
- Select the created User.
- Select the Security credentialstab.
- Click Create Access Keyin the Access Keyssection.
- Select Third-party serviceas the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
- Click Done.
- Select the Permissionstab.
- Click Add permissionsin the Permissions policiessection.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccesspolicy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- In the AWS console, go to IAM > Policies > Create policy > JSON tab.
-
Enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Sid" : "AllowPutObjects" , "Effect" : "Allow" , "Action" : "s3:PutObject" , "Resource" : "arn:aws:s3:::censys-logs/*" }, { "Sid" : "AllowGetStateObject" , "Effect" : "Allow" , "Action" : "s3:GetObject" , "Resource" : "arn:aws:s3:::censys-logs/censys/state.json" } ] }
- Replace
censys-logs
if you entered a different bucket name.
- Replace
-
Click Next > Create policy.
-
Go to IAM > Roles > Create role > AWS service > Lambda.
-
Attach the newly created policy and the AWSLambdaBasicExecutionRolemanaged policy (for CloudWatch Logs access).
-
Name the role
censys-lambda-role
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
- Provide the following configuration details:
Setting | Value |
---|---|
Name | censys-data-collector
|
Runtime | Python 3.13 |
Architecture | x86_64 |
Execution role | censys-lambda-role
|
-
After the function is created, open the Codetab, delete the stub and enter the following code (
censys-data-collector.py
):import json import boto3 import urllib3 import gzip import logging import os from datetime import datetime , timedelta , timezone from typing import Dict , List , Any , Optional from urllib.parse import urlencode # Configure logging logger = logging . getLogger () logger . setLevel ( logging . INFO ) # AWS S3 client s3_client = boto3 . client ( 's3' ) # HTTP client http = urllib3 . PoolManager () # Environment variables S3_BUCKET = os . environ [ 'S3_BUCKET' ] S3_PREFIX = os . environ [ 'S3_PREFIX' ] STATE_KEY = os . environ [ 'STATE_KEY' ] CENSYS_API_KEY = os . environ [ 'CENSYS_API_KEY' ] CENSYS_ORG_ID = os . environ [ 'CENSYS_ORG_ID' ] API_BASE = os . environ . get ( 'API_BASE' , 'https://api.platform.censys.io' ) class CensysCollector : def __init__ ( self ): self . headers = { 'Authorization' : f 'Bearer { CENSYS_API_KEY } ' , 'X-Organization-ID' : CENSYS_ORG_ID , 'Content-Type' : 'application/json' } def get_last_collection_time ( self ) - > Optional [ datetime ]: """Get the last collection timestamp from S3 state file.""" try : response = s3_client . get_object ( Bucket = S3_BUCKET , Key = STATE_KEY ) state = json . loads ( response [ 'Body' ] . read () . decode ( 'utf-8' )) return datetime . fromisoformat ( state . get ( 'last_collection_time' , '2024-01-01T00:00:00Z' )) except Exception as e : logger . info ( f "No state file found or error reading state: { e } " ) return datetime . now ( timezone . utc ) - timedelta ( hours = 1 ) def save_collection_time ( self , collection_time : datetime ): """Save the current collection timestamp to S3 state file.""" state = { 'last_collection_time' : collection_time . strftime ( '%Y-%m- %d T%H:%M:%SZ' )} s3_client . put_object ( Bucket = S3_BUCKET , Key = STATE_KEY , Body = json . dumps ( state ), ContentType = 'application/json' ) def collect_logbook_events ( self , cursor : str = None ) - > List [ Dict [ str , Any ]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f " { API_BASE } /v3/logbook" # Use cursor-based pagination as per Censys API documentation params = {} if cursor : params [ 'cursor' ] = cursor try : query_string = urlencode ( params ) if params else '' full_url = f " { url } ? { query_string } " if query_string else url response = http . request ( 'GET' , full_url , headers = self . headers ) if response . status != 200 : logger . error ( f "API request failed with status { response . status } : { response . data } " ) return [] data = json . loads ( response . data . decode ( 'utf-8' )) events . extend ( data . get ( 'logbook_entries' , [])) # Handle cursor-based pagination next_cursor = data . get ( 'next_cursor' ) if next_cursor : events . extend ( self . collect_logbook_events ( next_cursor )) logger . info ( f "Collected { len ( events ) } logbook events" ) return events except Exception as e : logger . error ( f "Error collecting logbook events: { e } " ) return [] def collect_risks_events ( self ) - > List [ Dict [ str , Any ]]: """Collect risk events from Censys ASM API.""" events = [] url = f " { API_BASE } /v3/risks" try : response = http . request ( 'GET' , url , headers = self . headers ) if response . status != 200 : logger . error ( f "API request failed with status { response . status } : { response . data } " ) return [] data = json . loads ( response . data . decode ( 'utf-8' )) events . extend ( data . get ( 'risks' , [])) logger . info ( f "Collected { len ( events ) } risk events" ) return events except Exception as e : logger . error ( f "Error collecting risk events: { e } " ) return [] def save_events_to_s3 ( self , events : List [ Dict [ str , Any ]], event_type : str ): """Save events to S3 in compressed NDJSON format.""" if not events : return timestamp = datetime . now ( timezone . utc ) . strftime ( '%Y%m %d _%H%M%S' ) filename = f " { S3_PREFIX }{ event_type } _ { timestamp } .json.gz" try : # Convert events to newline-delimited JSON ndjson_content = 'n' . join ( json . dumps ( event , separators = ( ',' , ':' )) for event in events ) # Compress with gzip gz_bytes = gzip . compress ( ndjson_content . encode ( 'utf-8' )) s3_client . put_object ( Bucket = S3_BUCKET , Key = filename , Body = gz_bytes , ContentType = 'application/gzip' , ContentEncoding = 'gzip' ) logger . info ( f "Saved { len ( events ) } { event_type } events to { filename } " ) except Exception as e : logger . error ( f "Error saving { event_type } events to S3: { e } " ) raise def lambda_handler ( event , context ): """AWS Lambda handler function.""" try : collector = CensysCollector () # Get last collection time for cursor state management last_collection_time = collector . get_last_collection_time () current_time = datetime . now ( timezone . utc ) logger . info ( f "Collecting events since { last_collection_time } " ) # Collect different types of events logbook_events = collector . collect_logbook_events () risk_events = collector . collect_risks_events () # Save events to S3 collector . save_events_to_s3 ( logbook_events , 'logbook' ) collector . save_events_to_s3 ( risk_events , 'risks' ) # Update state collector . save_collection_time ( current_time ) return { 'statusCode' : 200 , 'body' : json . dumps ({ 'message' : 'Censys data collection completed successfully' , 'logbook_events' : len ( logbook_events ), 'risk_events' : len ( risk_events ), 'collection_time' : current_time . strftime ( '%Y-%m- %d T%H:%M:%SZ' ) }) } except Exception as e : logger . error ( f "Lambda execution failed: { str ( e ) } " ) return { 'statusCode' : 500 , 'body' : json . dumps ({ 'error' : str ( e ) }) }
-
Go to Configuration > Environment variables > Edit > Add new environment variable.
-
Enter the following environment variables, replacing with your values:
Key Example value S3_BUCKET
censys-logs
S3_PREFIX
censys/
STATE_KEY
censys/state.json
CENSYS_API_KEY
<your-censys-api-key>
CENSYS_ORG_ID
<your-organization-id>
API_BASE
https://api.platform.censys.io
-
After the function is created, stay on its page (or open Lambda > Functions > your-function).
-
Select the Configurationtab.
-
In the General configurationpanel click Edit.
-
Change Timeoutto 5 minutes (300 seconds)and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate(
1 hour
). - Target: your Lambda function
censys-data-collector
. - Name:
censys-data-collector-1h
.
- Recurring schedule: Rate(
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- In the AWS Console. go to IAM > Users > Add users.
- Click Add users.
- Provide the following configuration details:
- User:
secops-reader
. - Access type: Access key — Programmatic access.
- User:
- Click Create user.
- Attach minimal read policy (custom): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
-
In the JSON editor, enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::censys-logs/*" }, { "Effect" : "Allow" , "Action" : [ "s3:ListBucket" ], "Resource" : "arn:aws:s3:::censys-logs" } ] }
-
Set the name to
secops-reader-policy
. -
Go to Create policy > search/select > Next > Add permissions.
-
Go to Security credentials > Access keys > Create access key.
-
Download the CSV(these values are entered into the feed).
Configure a feed in Google SecOps to ingest Censys logs
- Go to SIEM Settings > Feeds.
- Click + Add New Feed.
- In the Feed namefield, enter a name for the feed (for example,
Censys logs
). - Select Amazon S3 V2as the Source type.
- Select CENSYSas the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://censys-logs/censys/
- Source deletion options: Select deletion option according to your preference.
- Maximum File Age: Include files modified in the last number of days. Default is 180 days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: the asset namespace .
- Ingestion labels: the label applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalizescreen, and then click Submit.
UDM Mapping Table
Log field | UDM mapping | Logic |
---|---|---|
assetId
|
read_only_udm.principal.asset.hostname | If the assetId field is not an IP address, it is mapped to principal.asset.hostname. |
assetId
|
read_only_udm.principal.asset.ip | If the assetId field is an IP address, it is mapped to principal.asset.ip. |
assetId
|
read_only_udm.principal.hostname | If the assetId field is not an IP address, it is mapped to principal.hostname. |
assetId
|
read_only_udm.principal.ip | If the assetId field is an IP address, it is mapped to principal.ip. |
associatedAt
|
read_only_udm.security_result.detection_fields.value | The associatedAt field is mapped to security_result.detection_fields.value. |
autonomousSystem.asn
|
read_only_udm.additional.fields.value.string_value | The autonomousSystem.asn field is converted to a string and mapped to additional.fields.value.string_value with key "autonomousSystem_asn". |
autonomousSystem.bgpPrefix
|
read_only_udm.additional.fields.value.string_value | The autonomousSystem.bgpPrefix field is mapped to additional.fields.value.string_value with key "autonomousSystem_bgpPrefix". |
banner
|
read_only_udm.principal.resource.attribute.labels.value | The banner field is mapped to principal.resource.attribute.labels.value with key "banner". |
cloud
|
read_only_udm.metadata.vendor_name | The cloud field is mapped to metadata.vendor_name. |
comments.refUrl
|
read_only_udm.network.http.referral_url | The comments.refUrl field is mapped to network.http.referral_url. |
data.cve
|
read_only_udm.additional.fields.value.string_value | The data.cve field is mapped to additional.fields.value.string_value with key "data_cve". |
data.cvss
|
read_only_udm.additional.fields.value.string_value | The data.cvss field is mapped to additional.fields.value.string_value with key "data_cvss". |
data.ipAddress
|
read_only_udm.principal.asset.ip | If the data.ipAddress field is not equal to the assetId field, it is mapped to principal.asset.ip. |
data.ipAddress
|
read_only_udm.principal.ip | If the data.ipAddress field is not equal to the assetId field, it is mapped to principal.ip. |
data.location.city
|
read_only_udm.principal.location.city | If the location.city field is empty, the data.location.city field is mapped to principal.location.city. |
data.location.countryCode
|
read_only_udm.principal.location.country_or_region | If the location.country field is empty, the data.location.countryCode field is mapped to principal.location.country_or_region. |
data.location.latitude
|
read_only_udm.principal.location.region_coordinates.latitude | If the location.coordinates.latitude and location.geoCoordinates.latitude fields are empty, the data.location.latitude field is converted to a float and mapped to principal.location.region_coordinates.latitude. |
data.location.longitude
|
read_only_udm.principal.location.region_coordinates.longitude | If the location.coordinates.longitude and location.geoCoordinates.longitude fields are empty, the data.location.longitude field is converted to a float and mapped to principal.location.region_coordinates.longitude. |
data.location.province
|
read_only_udm.principal.location.state | If the location.province field is empty, the data.location.province field is mapped to principal.location.state. |
data.mailServers
|
read_only_udm.additional.fields.value.list_value.values.string_value | Each element in the data.mailServers array is mapped to a separate additional.fields entry with key "Mail Servers" and value.list_value.values.string_value set to the element value. |
data.names.forwardDns[].name
|
read_only_udm.network.dns.questions.name | Each element in the data.names.forwardDns array is mapped to a separate network.dns.questions entry with the name field set to the element's name field. |
data.nameServers
|
read_only_udm.additional.fields.value.list_value.values.string_value | Each element in the data.nameServers array is mapped to a separate additional.fields entry with key "Name nameServers" and value.list_value.values.string_value set to the element value. |
data.protocols[].transportProtocol
|
read_only_udm.network.ip_protocol | If the data.protocols[].transportProtocol field is one of TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP, or VRRP, it is mapped to network.ip_protocol. |
data.protocols[].transportProtocol
|
read_only_udm.principal.resource.attribute.labels.value | The data.protocols[].transportProtocol field is mapped to principal.resource.attribute.labels.value with key "data_protocols {index}". |
http.request.headers[].key, http.request.headers[].value.headers.0
|
read_only_udm.network.http.user_agent | If the http.request.headers[].key field is "User-Agent", the corresponding http.request.headers[].value.headers.0 field is mapped to network.http.user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0
|
read_only_udm.network.http.parsed_user_agent | If the http.request.headers[].key field is "User-Agent", the corresponding http.request.headers[].value.headers.0 field is parsed as a user agent string and mapped to network.http.parsed_user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0
|
read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | For each element in the http.request.headers array, the key field is mapped to principal.resource.attribute.labels.key and value.headers.0 field is mapped to principal.resource.attribute.labels.value. |
http.request.uri
|
read_only_udm.principal.asset.hostname | The hostname part of the http.request.uri field is extracted and mapped to principal.asset.hostname. |
http.request.uri
|
read_only_udm.principal.hostname | The hostname part of the http.request.uri field is extracted and mapped to principal.hostname. |
http.response.body
|
read_only_udm.principal.resource.attribute.labels.value | The http.response.body field is mapped to principal.resource.attribute.labels.value with key "http_response_body". |
http.response.headers[].key, http.response.headers[].value.headers.0
|
read_only_udm.target.hostname | If the http.response.headers[].key field is "Server", the corresponding http.response.headers[].value.headers.0 field is mapped to target.hostname. |
http.response.headers[].key, http.response.headers[].value.headers.0
|
read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | For each element in the http.response.headers array, the key field is mapped to principal.resource.attribute.labels.key and value.headers.0 field is mapped to principal.resource.attribute.labels.value. |
http.response.statusCode
|
read_only_udm.network.http.response_code | The http.response.statusCode field is converted to an integer and mapped to network.http.response_code. |
ip
|
read_only_udm.target.asset.ip | The ip field is mapped to target.asset.ip. |
ip
|
read_only_udm.target.ip | The ip field is mapped to target.ip. |
isSeed
|
read_only_udm.additional.fields.value.string_value | The isSeed field is converted to a string and mapped to additional.fields.value.string_value with key "isSeed". |
location.city
|
read_only_udm.principal.location.city | The location.city field is mapped to principal.location.city. |
location.continent
|
read_only_udm.additional.fields.value.string_value | The location.continent field is mapped to additional.fields.value.string_value with key "location_continent". |
location.coordinates.latitude
|
read_only_udm.principal.location.region_coordinates.latitude | The location.coordinates.latitude field is converted to a float and mapped to principal.location.region_coordinates.latitude. |
location.coordinates.longitude
|
read_only_udm.principal.location.region_coordinates.longitude | The location.coordinates.longitude field is converted to a float and mapped to principal.location.region_coordinates.longitude. |
location.country
|
read_only_udm.principal.location.country_or_region | The location.country field is mapped to principal.location.country_or_region. |
location.geoCoordinates.latitude
|
read_only_udm.principal.location.region_coordinates.latitude | If the location.coordinates.latitude field is empty, the location.geoCoordinates.latitude field is converted to a float and mapped to principal.location.region_coordinates.latitude. |
location.geoCoordinates.longitude
|
read_only_udm.principal.location.region_coordinates.longitude | If the location.coordinates.longitude field is empty, the location.geoCoordinates.longitude field is converted to a float and mapped to principal.location.region_coordinates.longitude. |
location.postalCode
|
read_only_udm.additional.fields.value.string_value | The location.postalCode field is mapped to additional.fields.value.string_value with key "Postal code". |
location.province
|
read_only_udm.principal.location.state | The location.province field is mapped to principal.location.state. |
operation
|
read_only_udm.security_result.action_details | The operation field is mapped to security_result.action_details. |
perspectiveId
|
read_only_udm.principal.group.product_object_id | The perspectiveId field is mapped to principal.group.product_object_id. |
port
|
read_only_udm.principal.port | The port field is converted to an integer and mapped to principal.port. |
risks[].severity, risks[].title
|
read_only_udm.security_result.category_details | The risks[].severity field is concatenated with the risks[].title field and mapped to security_result.category_details. |
serviceName
|
read_only_udm.network.application_protocol | If the serviceName field is "HTTP" or "HTTPS", it is mapped to network.application_protocol. |
sourceIp
|
read_only_udm.principal.asset.ip | The sourceIp field is mapped to principal.asset.ip. |
sourceIp
|
read_only_udm.principal.ip | The sourceIp field is mapped to principal.ip. |
timestamp
|
read_only_udm.metadata.event_timestamp | The timestamp field is parsed as a timestamp and mapped to metadata.event_timestamp. |
transportFingerprint.id
|
read_only_udm.metadata.product_log_id | The transportFingerprint.id field is converted to a string and mapped to metadata.product_log_id. |
transportFingerprint.raw
|
read_only_udm.additional.fields.value.string_value | The transportFingerprint.raw field is mapped to additional.fields.value.string_value with key "transportFingerprint_raw". |
type
|
read_only_udm.metadata.product_event_type | The type field is mapped to metadata.product_event_type. |
-
|
read_only_udm.metadata.product_name | The value "CENSYS_ASM" is assigned to metadata.product_name. |
-
|
read_only_udm.metadata.vendor_name | The value "CENSYS" is assigned to metadata.vendor_name. |
-
|
read_only_udm.metadata.event_type | The event type is determined based on the presence of specific fields: NETWORK_CONNECTION if has_princ_machine_id and has_target_machine are true and has_network_flow is false, NETWORK_DNS if has_network_flow is true, STATUS_UPDATE if has_princ_machine_id is true, and GENERIC_EVENT otherwise. |
Need more help? Get answers from Community members and Google SecOps professionals.