Collect DigiCert audit logs
This document explains how to ingest DigiCert audit logs to Google Security Operations using Amazon S3.
Before you begin
- Google SecOps instance
- Privileged access to DigiCert CertCentral(API key with Administrator role)
- Privileged access to AWS(S3, IAM, Lambda, EventBridge)
Get DigiCert API key and report ID
- In CertCentralgo to Account >
API Keysand create the API key(
X-DC-DEVKEY
). - In Reports > Report Library, create an Audit logreport with JSONformat and note its Report ID(UUID).
- You can also find an existing report's ID using the report history.
Configure AWS S3 bucket and IAM for Google SecOps
- Create Amazon S3 bucketfollowing this user guide: Creating a bucket
- Save bucket Nameand Regionfor future reference (for example,
digicert-logs
). - Create a user following this user guide: Creating an IAM user .
- Select the created User.
- Select the Security credentialstab.
- Click Create Access Keyin the Access Keyssection.
- Select Third-party serviceas the Use case.
- Click Next.
- Optional: add a description tag.
- Click Create access key.
- Click Download CSV fileto save the Access Keyand Secret Access Keyfor later use.
- Click Done.
- Select the Permissionstab.
- Click Add permissionsin the Permissions policiessection.
- Select Add permissions.
- Select Attach policies directly
- Search for and select the AmazonS3FullAccesspolicy.
- Click Next.
- Click Add permissions.
Configure the IAM policy and role for S3 uploads
- Go to AWS console > IAM > Policies > Create policy > JSON tab.
-
Enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Sid" : "AllowPutDigiCertObjects" , "Effect" : "Allow" , "Action" : [ "s3:PutObject" ], "Resource" : "arn:aws:s3:::digicert-logs/*" }, { "Sid" : "AllowGetStateObject" , "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::digicert-logs/digicert/logs/state.json" } ] }
- Replace
digicert-logs
if you entered a different bucket name.
- Replace
-
Click Next > Create policy.
-
Go to IAM > Roles > Create role > AWS service > Lambda.
-
Attach the newly created policy.
-
Name the role
WriteDigicertToS3Role
and click Create role.
Create the Lambda function
- In the AWS Console, go to Lambda > Functions > Create function.
- Click Author from scratch.
-
Provide the following configuration details:
Setting Value Name digicert_audit_logs_to_s3
Runtime Python 3.13 Architecture x86_64 Execution role WriteDigicertToS3Role
-
After the function is created, open the Codetab, delete the stub and enter the following code (
digicert_audit_logs_to_s3.py
):#!/usr/bin/env python3 import datetime as dt , gzip , io , json , os , time , uuid , zipfile from typing import Any , Dict , Iterable , List , Tuple from urllib import request , parse , error import boto3 from botocore.exceptions import ClientError API_BASE = "https://api.digicert.com/reports/v1" USER_AGENT = "secops-digicert-reports/1.0" s3 = boto3 . client ( "s3" ) def _now () - > dt . datetime : return dt . datetime . now ( dt . timezone . utc ) def _http ( method : str , url : str , api_key : str , body : bytes | None = None , timeout : int = 30 , max_retries : int = 5 ) - > Tuple [ int , Dict [ str , str ], bytes ]: headers = { "X-DC-DEVKEY" : api_key , "Content-Type" : "application/json" , "User-Agent" : USER_AGENT } attempt , backoff = 0 , 1.0 while True : req = request . Request ( url = url , method = method , headers = headers , data = body ) try : with request . urlopen ( req , timeout = timeout ) as resp : status , h = resp . status , { k . lower (): v for k , v in resp . headers . items ()} data = resp . read () if 500 < = status < = 599 and attempt < max_retries : attempt += 1 ; time . sleep ( backoff ); backoff *= 2 ; continue return status , h , data except error . HTTPError as e : status , h = e . code , { k . lower (): v for k , v in ( e . headers or {}) . items ()} if status == 429 and attempt < max_retries : ra = h . get ( "retry-after" ); delay = float ( ra ) if ra and ra . isdigit () else backoff attempt += 1 ; time . sleep ( delay ); backoff *= 2 ; continue if 500 < = status < = 599 and attempt < max_retries : attempt += 1 ; time . sleep ( backoff ); backoff *= 2 ; continue raise except error . URLError : if attempt < max_retries : attempt += 1 ; time . sleep ( backoff ); backoff *= 2 ; continue raise def start_report_run ( api_key : str , report_id : str , timeout : int ) - > None : st , _ , body = _http ( "POST" , f " { API_BASE } /report/ { report_id } /run" , api_key , b " {} " , timeout ) if st not in ( 200 , 201 ): raise RuntimeError ( f "Start run failed: { st } { body [: 200 ] !r} " ) def list_report_history ( api_key : str , * , status_filter : str | None = None , report_type : str | None = None , limit : int = 100 , sort_by : str = "report_start_date" , sort_direction : str = "DESC" , timeout : int = 30 , offset : int = 0 ) - > Dict [ str , Any ]: qs = { "limit" : str ( limit ), "offset" : str ( offset ), "sort_by" : sort_by , "sort_direction" : sort_direction } if status_filter : qs [ "status" ] = status_filter if report_type : qs [ "report_type" ] = report_type st , _ , body = _http ( "GET" , f " { API_BASE } /report/history? { parse . urlencode ( qs ) } " , api_key , timeout = timeout ) if st != 200 : raise RuntimeError ( f "History failed: { st } { body [: 200 ] !r} " ) return json . loads ( body . decode ( "utf-8" )) def find_ready_run ( api_key : str , report_id : str , started_not_before : dt . datetime , timeout : int , max_wait_seconds : int , poll_interval : int ) - > str : deadline = time . time () + max_wait_seconds while time . time () < deadline : hist = list_report_history ( api_key , status_filter = "READY" , report_type = "audit-logs" , limit = 200 , timeout = timeout ) . get ( "report_history" , []) for it in hist : if it . get ( "report_identifier" ) != report_id or not it . get ( "report_run_identifier" ): continue try : rsd = dt . datetime . strptime ( it . get ( "report_start_date" , "" ), "%Y-%m- %d %H:%M:%S" ) . replace ( tzinfo = dt . timezone . utc ) except Exception : rsd = started_not_before if rsd + dt . timedelta ( seconds = 60 ) > = started_not_before : return it [ "report_run_identifier" ] time . sleep ( poll_interval ) raise TimeoutError ( "READY run not found in time" ) def get_json_rows ( api_key : str , report_id : str , run_id : str , timeout : int ) - > List [ Dict [ str , Any ]]: st , h , body = _http ( "GET" , f " { API_BASE } /report/ { report_id } / { run_id } /json" , api_key , timeout = timeout ) if st != 200 : raise RuntimeError ( f "Get JSON failed: { st } { body [: 200 ] !r} " ) if "application/zip" in h . get ( "content-type" , "" ) . lower () or body [: 2 ] == b "PK" : with zipfile . ZipFile ( io . BytesIO ( body )) as zf : name = next (( n for n in zf . namelist () if n . lower () . endswith ( ".json" )), None ) if not name : raise RuntimeError ( "ZIP has no JSON" ) rows = json . loads ( zf . read ( name ) . decode ( "utf-8" )) else : rows = json . loads ( body . decode ( "utf-8" )) if not isinstance ( rows , list ): raise RuntimeError ( "Unexpected JSON format" ) return rows def load_state ( bucket : str , key : str ) - > Dict [ str , Any ]: try : return json . loads ( s3 . get_object ( Bucket = bucket , Key = key )[ "Body" ] . read () . decode ( "utf-8" )) except ClientError as e : if e . response [ "Error" ][ "Code" ] in ( "NoSuchKey" , "404" ): return {} raise def save_state ( bucket : str , key : str , state : Dict [ str , Any ]) - > None : s3 . put_object ( Bucket = bucket , Key = key , Body = json . dumps ( state ) . encode ( "utf-8" ), ContentType = "application/json" ) def write_ndjson_gz ( bucket : str , prefix : str , rows : Iterable [ Dict [ str , Any ]], run_id : str ) - > str : ts = _now () . strftime ( "%Y/%m/ %d /%H%M%S" ) key = f " { prefix } / { ts } -digicert-audit- { run_id [: 8 ] } - { uuid . uuid4 () . hex } .json.gz" buf = io . BytesIO () with gzip . GzipFile ( fileobj = buf , mode = "wb" ) as gz : for r in rows : gz . write (( json . dumps ( r , separators = ( ',' , ':' )) + " \n " ) . encode ( "utf-8" )) s3 . put_object ( Bucket = bucket , Key = key , Body = buf . getvalue (), ContentType = "application/x-ndjson" , ContentEncoding = "gzip" ) return key def lambda_handler ( event : Dict [ str , Any ], context : Any ) - > Dict [ str , Any ]: api_key = os . environ [ "DIGICERT_API_KEY" ] report_id = os . environ [ "DIGICERT_REPORT_ID" ] bucket = os . environ [ "S3_BUCKET" ] prefix = os . environ . get ( "S3_PREFIX" , "digicert/logs" ) . rstrip ( "/" ) state_key = os . environ . get ( "STATE_KEY" , f " { prefix } /state.json" ) max_wait = int ( os . environ . get ( "MAX_WAIT_SECONDS" , "300" )) poll_int = int ( os . environ . get ( "POLL_INTERVAL" , "10" )) timeout = int ( os . environ . get ( "REQUEST_TIMEOUT" , "30" )) state = load_state ( bucket , state_key ) if state_key else {} last_run = state . get ( "last_run_id" ) started = _now () start_report_run ( api_key , report_id , timeout ) run_id = find_ready_run ( api_key , report_id , started , timeout , max_wait , poll_int ) if last_run and last_run == run_id : return { "status" : "skip" , "report_run_identifier" : run_id } rows = get_json_rows ( api_key , report_id , run_id , timeout ) key = write_ndjson_gz ( bucket , prefix , rows , run_id ) if state_key : save_state ( bucket , state_key , { "last_run_id" : run_id , "last_success_at" : _now () . isoformat (), "last_s3_key" : key , "rows_count" : len ( rows )}) return { "status" : "ok" , "report_identifier" : report_id , "report_run_identifier" : run_id , "rows" : len ( rows ), "s3_key" : key }
-
Go to Configuration > Environment variables > Edit > Add new environment variable.
-
Enter the following environment variables, replacing with your values:
Key Example S3_BUCKET
digicert-logs
S3_PREFIX
digicert/logs/
STATE_KEY
digicert/logs/state.json
DIGICERT_API_KEY
xxxxxxxxxxxxxxxxxxxxxxxx
DIGICERT_REPORT_ID
88de5e19-ec57-4d70-865d-df953b062574
REQUEST_TIMEOUT
30
POLL_INTERVAL
10
MAX_WAIT_SECONDS
300
-
After the function is created, stay on its page (or open Lambda > Functions > your-function).
-
Select the Configurationtab.
-
In the General configurationpanel click Edit.
-
Change Timeoutto 15 minutes (900 seconds)and click Save.
Create an EventBridge schedule
- Go to Amazon EventBridge > Scheduler > Create schedule.
- Provide the following configuration details:
- Recurring schedule: Rate(
1 hour
). - Target: your Lambda function.
- Name:
digicert-audit-1h
.
- Recurring schedule: Rate(
- Click Create schedule.
Optional: Create read-only IAM user & keys for Google SecOps
- In the AWS Console, go to IAM > Users, then click Add users.
- Provide the following configuration details:
- User: Enter a unique name (for example,
secops-reader
) - Access type: Select Access key - Programmatic access
- Click Create user.
- User: Enter a unique name (for example,
- Attach minimal read policy (custom): Users >
select
secops-reader
> Permissions > Add permissions > Attach policies directly > Create policy -
In the JSON editor, enter the following policy:
{ "Version" : "2012-10-17" , "Statement" : [ { "Effect" : "Allow" , "Action" : [ "s3:GetObject" ], "Resource" : "arn:aws:s3:::<your-bucket>/*" }, { "Effect" : "Allow" , "Action" : [ "s3:ListBucket" ], "Resource" : "arn:aws:s3:::<your-bucket>" } ] }
-
Name =
secops-reader-policy
. -
Click Create policy > search/select > Next > Add permissions.
-
Create access key for
secops-reader
: Security credentials > Access keys > Create access keyCreate access key**. -
Download the CSV(these values are entered into the feed).
Configure a feed in Google SecOps to ingest DigiCert logs
- Go to SIEM Settings > Feeds.
- Click Add New Feed.
- In the Feed namefield, enter a name for the feed (for example,
DigiCert Audit Logs
). - Select Amazon S3 V2as the Source type.
- Select Digicertas the Log type.
- Click Next.
- Specify values for the following input parameters:
- S3 URI:
s3://digicert-logs/digicert/logs/
- Source deletion options: Select the deletion option according to your preference.
- Maximum File Age: Default 180 Days.
- Access Key ID: User access key with access to the S3 bucket.
- Secret Access Key: User secret key with access to the S3 bucket.
- Asset namespace: The asset namespace .
- Ingestion labels: The label to be applied to the events from this feed.
- S3 URI:
- Click Next.
- Review your new feed configuration in the Finalizescreen, and then click Submit.
Need more help? Get answers from Community members and Google SecOps professionals.