Receive and parse Pub/Sub messages about data profiles

This document provides examples that demonstrate how to receive and parse notifications about changes to your data profiles. Sensitive Data Protection sends these updates in the form of Pub/Sub messages .

Overview

You can configure Sensitive Data Protection to automatically generate profiles about data across an organization, folder, or project. Data profiles contain metrics and metadata about your data and help you determine where sensitive and high-risk data reside. Sensitive Data Protection reports these metrics at various levels of detail. For information about the types of data you can profile, see Supported resources .

When configuring the data profiler, you can turn on the option to publish Pub/Sub messages whenever significant changes in your data profiles occur. The messages help you take immediate action in response to those changes. The following are the events that you can listen for:

  • A data asset is profiled for the first time.
  • A profile is updated.
  • The risk or sensitivity score of a profile increases.
  • There is a new error related to your data profiles.

The Pub/Sub messages that the data profiler publishes contain a DataProfilePubSubMessage object. These messages are always sent in binary format, so you need to write code that receives and parses them.

Pricing

When you use Pub/Sub, you are billed according to Pub/Sub pricing .

Before you begin

This page assumes the following:

Before you start working on the examples, follow these steps:

  1. Create a Pub/Sub topic and add a subscription for it. Don't assign a schema to the topic.

    For simplicity, the examples on this page listen to only one subscription. However, in practice, you can create a topic and subscription for each event that Sensitive Data Protection supports.

  2. If you haven't already done so, configure the data profiler to publish Pub/Sub messages:

    1. Edit your scan configuration.

    2. On the Edit scan configurationpage, turn on the Publish to Pub/Suboption and select the events that you want to listen for. Then, configure the settings for each event.

    3. Save the scan configuration.

  3. Grant the Sensitive Data Protection service agent publishing access on the Pub/Sub topic. An example of a role that has publishing access is the Pub/Sub Publisher role ( roles/pubsub.publisher ). The Sensitive Data Protection service agent is an email address in the format:

      service 
     - 
      PROJECT_NUMBER 
     
     @dlp 
     - 
     api 
     . 
    iam.gserviceaccount.com 
    

    If you're working with an organization- or folder-level scan configuration, the PROJECT_NUMBER is the numerical identifier of the service agent container . If you're working with a project-level scan configuration, the PROJECT_NUMBER is the numerical identifier of your project.

  4. Install and set up the Sensitive Data Protection client library for Java or Python.

Examples

The following examples demonstrate how to receive and parse Pub/Sub messages that the data profiler publishes. You can repurpose these examples and deploy them as Cloud Run functions that are triggered by Pub/Sub events. For more information, see Pub/Sub tutorial (2nd gen) .

In the following examples, replace the following:

  • PROJECT_ID : the ID of the project that contains the Pub/Sub subscription.
  • SUBSCRIPTION_ID : the ID of the Pub/Sub subscription.

Java

  import 
  
 com.google.api.core. ApiService 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.privacy.dlp.v2. DataProfilePubSubMessage 
 
 ; 
 import 
  
 com.google.protobuf. InvalidProtocolBufferException 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 DataProfilePubSubMessageParser 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 String 
  
 projectId 
  
 = 
  
 " PROJECT_ID 
" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 " SUBSCRIPTION_ID 
" 
 ; 
  
 int 
  
 timeoutSeconds 
  
 = 
  
 5 
 ; 
  
 // The `ProjectSubscriptionName.of` method creates a fully qualified identifier 
  
 // in the form `projects/{projectId}/subscriptions/{subscriptionId}`. 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 pubsubMessage 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 try 
  
 { 
  
 DataProfilePubSubMessage 
  
 message 
  
 = 
  
 DataProfilePubSubMessage 
 . 
 parseFrom 
 ( 
  
 pubsubMessage 
 . 
 getData 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
  
 "PubsubMessage with ID: " 
  
 + 
  
 pubsubMessage 
 . 
 getMessageId 
 () 
  
 + 
  
 "; message size: " 
  
 + 
  
 pubsubMessage 
 . 
 getData 
 (). 
 size 
 () 
  
 + 
  
 "; event: " 
  
 + 
  
 message 
 . 
 getEvent 
 () 
  
 + 
  
 "; profile name: " 
  
 + 
  
 message 
 . 
 getProfile 
 (). 
 getName 
 () 
  
 + 
  
 "; full resource: " 
  
 + 
  
 message 
 . 
 getProfile 
 (). 
 getFullResource 
 ()); 
  
 consumer 
 . 
 ack 
 (); 
  
 } 
  
 catch 
  
 ( 
 InvalidProtocolBufferException 
  
 e 
 ) 
  
 { 
  
 e 
 . 
 printStackTrace 
 (); 
  
 } 
  
 }; 
  
 // Create subscriber client. 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ). 
 build 
 (); 
  
 try 
  
 { 
  
  ApiService 
 
  
 apiService 
  
 = 
  
 subscriber 
 . 
  startAsync 
 
 (); 
  
 apiService 
 . 
  awaitRunning 
 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s for %d seconds.%n" 
 , 
  
 subscriptionName 
 , 
  
 timeoutSeconds 
 ); 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 timeoutSeconds 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 ignored 
 ) 
  
 { 
  
 } 
  
 finally 
  
 { 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Python

  from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 from 
  
 concurrent.futures 
  
 import 
 TimeoutError 
 from 
  
 google.cloud 
  
 import 
  dlp_v2 
 
 project_id 
 = 
 " PROJECT_ID 
" 
 subscription_id 
 = 
 " SUBSCRIPTION_ID 
" 
 timeout 
 = 
 5.0 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 # The `subscription_path` method creates a fully qualified identifier 
 # in the form `projects/{project_id}/subscriptions/{subscription_id}` 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 def 
  
 callback 
 ( 
 message 
 : 
 pubsub_v1 
 . 
 subscriber 
 . 
 message 
 . 
  Message 
 
 ) 
 - 
> None 
 : 
 print 
 ( 
 f 
 "Received 
 { 
 message 
 . 
  data 
 
 } 
 ." 
 ) 
 dlp_msg 
 = 
  dlp_v2 
 
 . 
  DataProfilePubSubMessage 
 
 () 
 dlp_msg 
 . 
 _pb 
 . 
 ParseFromString 
 ( 
 message 
 . 
  data 
 
 ) 
 print 
 ( 
 "Parsed message: " 
 , 
 dlp_msg 
 ) 
 print 
 ( 
 "--------" 
 ) 
 message 
 . 
  ack 
 
 () 
 streaming_pull_future 
 = 
  subscribe 
 
r . 
  subscribe 
 
 ( 
 subscription_path 
 , 
 callback 
 = 
 callback 
 ) 
 print 
 ( 
 f 
 "Listening for messages on 
 { 
 subscription_path 
 } 
 for 
 { 
  timeout 
 
 } 
 seconds..." 
 ) 
 # Wrap subscriber in a 'with' block to automatically call close() when done. 
 with 
 subscriber 
 : 
 try 
 : 
 # When `timeout` is not set, result() will block indefinitely, 
 # unless an exception is encountered first. 
 streaming_pull_future 
 . 
 result 
 ( 
 timeout 
 = 
 timeout 
 ) 
 except 
 TimeoutError 
 : 
 streaming_pull_future 
 . 
 cancel 
 () 
 # Trigger the shutdown. 
 streaming_pull_future 
 . 
 result 
 () 
 # Block until the shutdown is complete. 
 print 
 ( 
 "Done waiting." 
 ) 
 

What's next

Design a Mobile Site
View Site in Mobile | Classic
Share by: