Collect Fortra Digital Guardian (formerly Digital Guardian EDR) logs

Supported in:

This document explains how to ingest Fortra Digital Guardian (formerly known as Digital Guardian EDR) logs to Google Security Operations using Google Cloud Storage V2 using a Cloud Run function.

Fortra Digital Guardian is a comprehensive data loss prevention and endpoint detection and response platform that provides visibility into system, user, and data events across endpoints, networks, and cloud applications. The Analytics & Reporting Cloud (ARC) service delivers advanced analytics, workflow, and reporting capabilities for holistic data protection.

The Cloud Run function authenticates to the ARC Export API using OAuth 2.0, retrieves export data, acknowledges the bookmark to advance to the next chunk, writes the results as NDJSON to a GCS bucket, and Google SecOps ingests them through a GCS V2 feed.

Before you begin

Make sure that you have the following prerequisites:

  • A Google SecOps instance.
  • A Google Cloud project with the following APIs enabled:
    • Cloud Storage
    • Cloud Run functions
    • Cloud Scheduler
    • Pub/Sub
    • Cloud Build
  • Permissions to create and manage GCS buckets, Cloud Run functions, Pub/Sub topics, and Cloud Scheduler jobs.
  • Privileged access to the Digital Guardian Management Console (DGMC).
  • Access to Digital Guardian Analytics & Reporting Cloud (ARC) Tenant Settings.
  • Administrator permissions to configure Cloud Services in DGMC.
  • An Export Profile created in DGMC with a valid GUID.

Create a Google Cloud Storage bucket

  1. Go to the Google Cloud Console .
  2. Select your project or create a new one.
  3. In the navigation menu, go to Cloud Storage > Buckets.
  4. Click Create bucket.
  5. Provide the following configuration details:

    Setting Value
    Name your bucket Enter a globally unique name (e.g., digitalguardian-edr-logs )
    Location type Choose based on your needs (Region, Dual-region, Multi-region)
    Location Select the location closest to your instance (e.g., us-central1 )
    Storage class Standard(recommended for frequently accessed logs)
    Access control Uniform(recommended)
    Protection tools Optional: Enable object versioning or retention policy
  6. Click Create.

Collect Digital Guardian API credentials

Obtain API credentials

Get Client ID and Client Secret from ARC Tenant Settings

  1. Sign in to the Digital Guardian ARCportal.
  2. Go to ARC Tenant Settings.
  3. Copy and save the following values:
    • Tenant ID: This is your Client ID.
    • Authentication Token: This is your Client Secret.

Get gateway URLs from DGMC

  1. Sign in to the Digital Guardian Management Console (DGMC).
  2. Go to System > Configuration > Cloud Services.
  3. In the API Accesssection, record:
    • Access Gateway Base URL: e.g., https://accessgw-usw.msp.digitalguardian.com
    • Authorization Server URL: e.g., https://authsrv.msp.digitalguardian.com/as/token.oauth2

Create and configure an export profile

  1. In the DGMC, go to Admin > Reports > Export Profiles.
  2. Click Create Export Profile.
  3. Set Export Formatto JSON Flattened Table.
  4. Click Saveand copy the GUIDfrom the profile details (format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx ).

Create the Cloud Run function

  • Prepare function source files

    • requirements.txt

       functions-framework==3.*
      google-cloud-storage==2.*
      urllib3==2.* 
      
    • main.py

        """Cloud Run function to ingest Digital Guardian EDR logs into GCS.""" 
       import 
        
       json 
       import 
        
       os 
       import 
        
       time 
       import 
        
       urllib.parse 
       from 
        
       datetime 
        
       import 
       datetime 
       , 
       timezone 
       import 
        
       functions_framework 
       import 
        
       urllib3 
       from 
        
       google.cloud 
        
       import 
        storage 
       
       GCS_BUCKET 
       = 
       os 
       . 
       environ 
       [ 
       "GCS_BUCKET" 
       ] 
       GCS_PREFIX 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "GCS_PREFIX" 
       , 
       "digitalguardian_edr" 
       ) 
       STATE_KEY 
       = 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "STATE_KEY" 
       , 
       "digitalguardian_edr_state.json" 
       ) 
       AUTH_SERVER_URL 
       = 
       os 
       . 
       environ 
       [ 
       "AUTH_SERVER_URL" 
       ] 
       ARC_SERVER_URL 
       = 
       os 
       . 
       environ 
       [ 
       "ARC_SERVER_URL" 
       ] 
       CLIENT_ID 
       = 
       os 
       . 
       environ 
       [ 
       "CLIENT_ID" 
       ] 
       CLIENT_SECRET 
       = 
       os 
       . 
       environ 
       [ 
       "CLIENT_SECRET" 
       ] 
       EXPORT_PROFILE_GUID 
       = 
       os 
       . 
       environ 
       [ 
       "EXPORT_PROFILE_GUID" 
       ] 
       MAX_RECORDS 
       = 
       int 
       ( 
       os 
       . 
       environ 
       . 
       get 
       ( 
       "MAX_RECORDS" 
       , 
       "10000" 
       )) 
       http 
       = 
       urllib3 
       . 
       PoolManager 
       () 
       gcs 
       = 
        storage 
       
       . 
        Client 
       
       () 
       def 
        
       _get_access_token 
       () 
       - 
      > str 
       : 
       body 
       = 
       urllib 
       . 
       parse 
       . 
       urlencode 
       ({ 
       "grant_type" 
       : 
       "client_credentials" 
       , 
       "client_id" 
       : 
       CLIENT_ID 
       , 
       "client_secret" 
       : 
       CLIENT_SECRET 
       , 
       "scope" 
       : 
       "client" 
       , 
       }) 
       resp 
       = 
       http 
       . 
       request 
       ( 
       "POST" 
       , 
       AUTH_SERVER_URL 
       , 
       body 
       = 
       body 
       , 
       headers 
       = 
       { 
       "Content-Type" 
       : 
       "application/x-www-form-urlencoded" 
       }, 
       ) 
       if 
       resp 
       . 
       status 
       != 
       200 
       : 
       raise 
       RuntimeError 
       ( 
       f 
       "OAuth token request failed: 
       { 
       resp 
       . 
       status 
       } 
       " 
       ) 
       return 
       json 
       . 
       loads 
       ( 
       resp 
       . 
       data 
       . 
       decode 
       ( 
       "utf-8" 
       ))[ 
       "access_token" 
       ] 
       def 
        
       _arc_get 
       ( 
       token 
       : 
       str 
       , 
       path 
       : 
       str 
       , 
       retries 
       : 
       int 
       = 
       5 
       ) 
       - 
      > dict 
       : 
       url 
       = 
       f 
       " 
       { 
       ARC_SERVER_URL 
       }{ 
       path 
       } 
       " 
       headers 
       = 
       { 
       "Authorization" 
       : 
       f 
       "Bearer 
       { 
       token 
       } 
       " 
       , 
       "Accept" 
       : 
       "application/json" 
       } 
       backoff 
       = 
       2 
       for 
       attempt 
       in 
       range 
       ( 
       retries 
       ): 
       resp 
       = 
       http 
       . 
       request 
       ( 
       "GET" 
       , 
       url 
       , 
       headers 
       = 
       headers 
       ) 
       if 
       resp 
       . 
       status 
       == 
       200 
       : 
       return 
       json 
       . 
       loads 
       ( 
       resp 
       . 
       data 
       . 
       decode 
       ( 
       "utf-8" 
       )) 
       if 
       resp 
       . 
       status 
       == 
       429 
       : 
       time 
       . 
       sleep 
       ( 
       backoff 
       * 
       ( 
       2 
       ** 
       attempt 
       )) 
       continue 
       raise 
       RuntimeError 
       ( 
       f 
       "ARC API error: 
       { 
       resp 
       . 
       status 
       } 
       " 
       ) 
       raise 
       RuntimeError 
       ( 
       "ARC API rate limit exceeded." 
       ) 
       def 
        
       _arc_acknowledge 
       ( 
       token 
       : 
       str 
       ) 
       - 
      > None 
       : 
       url 
       = 
       f 
       " 
       { 
       ARC_SERVER_URL 
       } 
       /rest/1.0/export/ 
       { 
       EXPORT_PROFILE_GUID 
       } 
       /acknowledge" 
       headers 
       = 
       { 
       "Authorization" 
       : 
       f 
       "Bearer 
       { 
       token 
       } 
       " 
       , 
       "Accept" 
       : 
       "application/json" 
       } 
       resp 
       = 
       http 
       . 
       request 
       ( 
       "POST" 
       , 
       url 
       , 
       headers 
       = 
       headers 
       ) 
       if 
       resp 
       . 
       status 
       not 
       in 
       ( 
       200 
       , 
       204 
       ): 
       raise 
       RuntimeError 
       ( 
       f 
       "ARC acknowledge failed: 
       { 
       resp 
       . 
       status 
       } 
       " 
       ) 
       def 
        
       _load_state 
       () 
       - 
      > dict 
       : 
       bucket 
       = 
       gcs 
       . 
        bucket 
       
       ( 
       GCS_BUCKET 
       ) 
       blob 
       = 
       bucket 
       . 
       blob 
       ( 
       f 
       " 
       { 
       GCS_PREFIX 
       } 
       / 
       { 
       STATE_KEY 
       } 
       " 
       ) 
       return 
       json 
       . 
       loads 
       ( 
       blob 
       . 
        download_as_text 
       
       ()) 
       if 
       blob 
       . 
       exists 
       () 
       else 
       {} 
       def 
        
       _save_state 
       ( 
       state 
       : 
       dict 
       ) 
       - 
      > None 
       : 
       bucket 
       = 
       gcs 
       . 
        bucket 
       
       ( 
       GCS_BUCKET 
       ) 
       blob 
       = 
       bucket 
       . 
       blob 
       ( 
       f 
       " 
       { 
       GCS_PREFIX 
       } 
       / 
       { 
       STATE_KEY 
       } 
       " 
       ) 
       blob 
       . 
        upload_from_string 
       
       ( 
       json 
       . 
       dumps 
       ( 
       state 
       ), 
       content_type 
       = 
       "application/json" 
       ) 
       def 
        
       _fetch_export 
       ( 
       token 
       : 
       str 
       ) 
       - 
      > list 
       : 
       path 
       = 
       f 
       "/rest/1.0/export/ 
       { 
       EXPORT_PROFILE_GUID 
       } 
       " 
       data 
       = 
       _arc_get 
       ( 
       token 
       , 
       path 
       ) 
       records 
       = 
       data 
       if 
       isinstance 
       ( 
       data 
       , 
       list 
       ) 
       else 
       data 
       . 
       get 
       ( 
       "data" 
       , 
       []) 
       return 
       records 
       [: 
       MAX_RECORDS 
       ] 
       def 
        
       _write_ndjson 
       ( 
       records 
       : 
       list 
       , 
       run_ts 
       : 
       str 
       ) 
       - 
      > str 
       : 
       bucket 
       = 
       gcs 
       . 
        bucket 
       
       ( 
       GCS_BUCKET 
       ) 
       blob_path 
       = 
       f 
       " 
       { 
       GCS_PREFIX 
       } 
       /year= 
       { 
       run_ts 
       [: 
       4 
       ] 
       } 
       /month= 
       { 
       run_ts 
       [ 
       5 
       : 
       7 
       ] 
       } 
       /day= 
       { 
       run_ts 
       [ 
       8 
       : 
       10 
       ] 
       } 
       / 
       { 
       run_ts 
       } 
       _export.ndjson" 
       blob 
       = 
       bucket 
       . 
       blob 
       ( 
       blob_path 
       ) 
       ndjson 
       = 
       " 
       \n 
       " 
       . 
       join 
       ( 
       json 
       . 
       dumps 
       ( 
       r 
       , 
       separators 
       = 
       ( 
       "," 
       , 
       ":" 
       )) 
       for 
       r 
       in 
       records 
       ) 
       blob 
       . 
        upload_from_string 
       
       ( 
       ndjson 
       , 
       content_type 
       = 
       "application/x-ndjson" 
       ) 
       return 
       blob_path 
       @functions_framework 
       . 
       cloud_event 
       def 
        
       main 
       ( 
       cloud_event 
       ): 
       state 
       = 
       _load_state 
       () 
       now 
       = 
       datetime 
       . 
       now 
       ( 
       timezone 
       . 
       utc 
       ) 
       token 
       = 
       _get_access_token 
       () 
       records 
       = 
       _fetch_export 
       ( 
       token 
       ) 
       if 
       not 
       records 
       : 
       return 
       "OK" 
       run_ts 
       = 
       now 
       . 
       strftime 
       ( 
       "%Y-%m- 
       %d 
       T%H%M%SZ" 
       ) 
       blob_path 
       = 
       _write_ndjson 
       ( 
       records 
       , 
       run_ts 
       ) 
       _arc_acknowledge 
       ( 
       token 
       ) 
       state 
       [ 
       "last_run" 
       ] 
       = 
       now 
       . 
       isoformat 
       () 
       _save_state 
       ( 
       state 
       ) 
       return 
       "OK" 
       
      
  1. Go to SIEM Settings > Feeds.
  2. Click Add New Feed.
  3. Select Google Cloud Storage V2as the Source type.
  4. Select Digital Guardian EDRas the Log type.
  5. Click Get Service Accountand copy the email address.
  6. Specify the Storage bucket URL: gs://digitalguardian-edr-logs/digitalguardian_edr/ (ensure the trailing slash is included).

UDM mapping table

Log Field UDM Mapping Logic
Application
target.application Value copied directly.
Bytes_Written
network.sent_bytes Converted to uinteger.
Command_Line
principal.process.command_line Cleaned of trailing quotes.
Computer_Name
principal.hostname Source hostname mapping.
Destination_File
target.file.names Direct mapping.
IP_Address
principal.ip Principal IP address.
Process_PID
principal.process.pid Process ID mapping.
Rule
security_result.rule_name Direct mapping.
Severity
security_result.severity Scaled to LOW, MEDIUM, HIGH, CRITICAL.
URL_Path
target.url Web destination mapping.
User_Name
principal.user.userid Extracted via grok.
security_action
security_result.action Direct mapping.
N/A
metadata.product_name Set to "Enterprise DLP Platform".
N/A
metadata.vendor_name Set to "DigitalGuardian".

Need more help? Get answers from Community members and Google SecOps professionals.

Design a Mobile Site
View Site in Mobile | Classic
Share by: