You can use Sensitive Data Protection to compute numerical and categorical numerical statistics for individual columns in BigQuery tables. Sensitive Data Protection can calculate the following:
- The column's minimum value
- The column's maximum value
- Quantile values for the column
- A histogram of value frequencies in the column
Compute numerical statistics
You can determine minimum, maximum, and quantile values for an individual
BigQuery column. To calculate these values, you configure a DlpJob
,
setting the NumericalStatsConfig
privacy metric to the name of the column to scan. When you run the job
,
Sensitive Data Protection computes statistics for the given column, returning
its results in the NumericalStatsResult
object. Sensitive Data Protection can compute statistics for the following
number types:
- integer
- float
- date
- datetime
- timestamp
- time
The statistics that a scan run returns include the minimum value, the maximum value, and 99 quantile values that partition the set of field values into 100 equal sized buckets.
Code examples
Following is sample code in several languages that demonstrates how to use Sensitive Data Protection to calculate numerical statistics.
C#
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
using
Google.Api.Gax.ResourceNames
;
using
Google.Cloud.Dlp.V2
;
using
Google.Cloud.PubSub.V1
;
using
Newtonsoft.Json
;
using
System
;
using
System.Collections.Generic
;
using
System.Linq
;
using
System.Threading
;
using
System.Threading.Tasks
;
using
static
Google
.
Cloud
.
Dlp
.
V2
.
Action
.
Types
;
using
static
Google
.
Cloud
.
Dlp
.
V2
.
PrivacyMetric
.
Types
;
public
class
RiskAnalysisCreateNumericalStats
{
public
static
AnalyzeDataSourceRiskDetails
.
Types
.
NumericalStatsResult
NumericalStats
(
string
callingProjectId
,
string
tableProjectId
,
string
datasetId
,
string
tableId
,
string
topicId
,
string
subscriptionId
,
string
columnName
)
{
var
dlp
=
DlpServiceClient
.
Create
();
// Construct + submit the job
var
config
=
new
RiskAnalysisJobConfig
{
PrivacyMetric
=
new
PrivacyMetric
{
NumericalStatsConfig
=
new
NumericalStatsConfig
{
Field
=
new
FieldId
{
Name
=
columnName
}
}
},
SourceTable
=
new
BigQueryTable
{
ProjectId
=
tableProjectId
,
DatasetId
=
datasetId
,
TableId
=
tableId
},
Actions
=
{
new
Google
.
Cloud
.
Dlp
.
V2
.
Action
{
PubSub
=
new
PublishToPubSub
{
Topic
=
$"projects/{callingProjectId}/topics/{topicId}"
}
}
}
};
var
submittedJob
=
dlp
.
CreateDlpJob
(
new
CreateDlpJobRequest
{
ParentAsProjectName
=
new
ProjectName
(
callingProjectId
),
RiskJob
=
config
});
// Listen to pub/sub for the job
var
subscriptionName
=
new
SubscriptionName
(
callingProjectId
,
subscriptionId
);
var
subscriber
=
SubscriberClient
.
CreateAsync
(
subscriptionName
).
Result
;
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
var
done
=
new
ManualResetEventSlim
(
false
);
subscriber
.
StartAsync
((
PubsubMessage
message
,
CancellationToken
cancel
)
=
>
{
if
(
message
.
Attributes
[
"DlpJobName"
]
==
submittedJob
.
Name
)
{
Thread
.
Sleep
(
500
);
// Wait for DLP API results to become consistent
done
.
Set
();
return
Task
.
FromResult
(
SubscriberClient
.
Reply
.
Ack
);
}
else
{
return
Task
.
FromResult
(
SubscriberClient
.
Reply
.
Nack
);
}
});
done
.
Wait
(
TimeSpan
.
FromMinutes
(
10
));
// 10 minute timeout; may not work for large jobs
subscriber
.
StopAsync
(
CancellationToken
.
None
).
Wait
();
// Process results
var
resultJob
=
dlp
.
GetDlpJob
(
new
GetDlpJobRequest
{
DlpJobName
=
DlpJobName
.
Parse
(
submittedJob
.
Name
)
});
var
result
=
resultJob
.
RiskDetails
.
NumericalStatsResult
;
// 'UnpackValue(x)' is a prettier version of 'x.toString()'
Console
.
WriteLine
(
$"Value Range: [{UnpackValue(result. MinValue
)}, {UnpackValue(result. MaxValue
)}]"
);
var
lastValue
=
string
.
Empty
;
for
(
var
quantile
=
0
;
quantile
<
result
.
QuantileValues
.
Count
;
quantile
++
)
{
var
currentValue
=
UnpackValue
(
result
.
QuantileValues
[
quantile
]);
if
(
lastValue
!=
currentValue
)
{
Console
.
WriteLine
(
$"Value at {quantile + 1}% quantile: {currentValue}"
);
}
lastValue
=
currentValue
;
}
return
result
;
}
public
static
string
UnpackValue
(
Value
protoValue
)
{
var
jsonValue
=
JsonConvert
.
DeserializeObject<Dictionary<string
,
object
>> (
protoValue
.
ToString
());
return
jsonValue
.
Values
.
ElementAt
(
0
).
ToString
();
}
}
Go
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
(
"context"
"fmt"
"io"
"time"
dlp
"cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// riskNumerical computes the numerical risk of the given column.
func
riskNumerical
(
w
io
.
Writer
,
projectID
,
dataProject
,
pubSubTopic
,
pubSubSub
,
datasetID
,
tableID
,
columnName
string
)
error
{
// projectID := "my-project-id"
// dataProject := "bigquery-public-data"
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// datasetID := "nhtsa_traffic_fatalities"
// tableID := "accident_2015"
// columnName := "state_number"
ctx
:=
context
.
Background
()
client
,
err
:=
dlp
.
NewClient
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"dlp.NewClient: %w"
,
err
)
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
err
}
defer
pubsubClient
.
Close
()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t
:=
pubsubClient
.
Topic
(
pubSubTopic
)
topicExists
,
err
:=
t
.
Exists
(
ctx
)
if
err
!=
nil
{
return
err
}
if
!
topicExists
{
if
t
,
err
=
pubsubClient
.
CreateTopic
(
ctx
,
pubSubTopic
);
err
!=
nil
{
return
err
}
}
// Create the Subscription if it doesn't exist.
s
:=
pubsubClient
.
Subscription
(
pubSubSub
)
subExists
,
err
:=
s
.
Exists
(
ctx
)
if
err
!=
nil
{
return
err
}
if
!
subExists
{
if
s
,
err
=
pubsubClient
.
CreateSubscription
(
ctx
,
pubSubSub
,
pubsub
.
SubscriptionConfig
{
Topic
:
t
});
err
!=
nil
{
return
err
}
}
// topic is the PubSub topic string where messages should be sent.
topic
:=
"projects/"
+
projectID
+
"/topics/"
+
pubSubTopic
// Create a configured request.
req
:=
& dlppb
.
CreateDlpJobRequest
{
Parent
:
fmt
.
Sprintf
(
"projects/%s/locations/global"
,
projectID
),
Job
:
& dlppb
.
CreateDlpJobRequest_RiskJob
{
RiskJob
:
& dlppb
.
RiskAnalysisJobConfig
{
// PrivacyMetric configures what to compute.
PrivacyMetric
:
& dlppb
.
PrivacyMetric
{
Type
:
& dlppb
.
PrivacyMetric_NumericalStatsConfig_
{
NumericalStatsConfig
:
& dlppb
.
PrivacyMetric_NumericalStatsConfig
{
Field
:
& dlppb
.
FieldId
{
Name
:
columnName
,
},
},
},
},
// SourceTable describes where to find the data.
SourceTable
:
& dlppb
.
BigQueryTable
{
ProjectId
:
dataProject
,
DatasetId
:
datasetID
,
TableId
:
tableID
,
},
// Send a message to PubSub using Actions.
Actions
:
[]
*
dlppb
.
Action
{
{
Action
:
& dlppb
.
Action_PubSub
{
PubSub
:
& dlppb
.
Action_PublishToPubSub
{
Topic
:
topic
,
},
},
},
},
},
},
}
// Create the risk job.
j
,
err
:=
client
.
CreateDlpJob
(
ctx
,
req
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"CreateDlpJob: %w"
,
err
)
}
fmt
.
Fprintf
(
w
,
"Created job: %v\n"
,
j
.
GetName
())
// Wait for the risk job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
10
*
time
.
Minute
)
defer
cancel
()
err
=
s
.
Receive
(
ctx
,
func
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
{
// If this is the wrong job, do not process the result.
if
msg
.
Attributes
[
"DlpJobName"
]
!=
j
.
GetName
()
{
msg
.
Nack
()
return
}
msg
.
Ack
()
time
.
Sleep
(
500
*
time
.
Millisecond
)
resp
,
err
:=
client
.
GetDlpJob
(
ctx
,
& dlppb
.
GetDlpJobRequest
{
Name
:
j
.
GetName
(),
})
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"GetDlpJob: %v"
,
err
)
return
}
n
:=
resp
.
GetRiskDetails
().
GetNumericalStatsResult
()
fmt
.
Fprintf
(
w
,
"Value range: [%v, %v]\n"
,
n
.
GetMinValue
(),
n
.
GetMaxValue
())
var
tmp
string
for
p
,
v
:=
range
n
.
GetQuantileValues
()
{
if
v
.
String
()
!=
tmp
{
fmt
.
Fprintf
(
w
,
"Value at %v quantile: %v\n"
,
p
,
v
)
tmp
=
v
.
String
()
}
}
// Stop listening for more messages.
cancel
()
})
if
err
!=
nil
{
return
fmt
.
Errorf
(
"Recieve: %w"
,
err
)
}
return
nil
}
Java
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
com.google.api.core. SettableApiFuture
;
import
com.google.cloud.dlp.v2. DlpServiceClient
;
import
com.google.cloud.pubsub.v1. AckReplyConsumer
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.cloud.pubsub.v1. Subscriber
;
import
com.google.privacy.dlp.v2. Action
;
import
com.google.privacy.dlp.v2. Action
. PublishToPubSub
;
import
com.google.privacy.dlp.v2. AnalyzeDataSourceRiskDetails
. NumericalStatsResult
;
import
com.google.privacy.dlp.v2. BigQueryTable
;
import
com.google.privacy.dlp.v2. CreateDlpJobRequest
;
import
com.google.privacy.dlp.v2. DlpJob
;
import
com.google.privacy.dlp.v2. FieldId
;
import
com.google.privacy.dlp.v2. GetDlpJobRequest
;
import
com.google.privacy.dlp.v2. LocationName
;
import
com.google.privacy.dlp.v2. PrivacyMetric
;
import
com.google.privacy.dlp.v2. PrivacyMetric
. NumericalStatsConfig
;
import
com.google.privacy.dlp.v2. RiskAnalysisJobConfig
;
import
com.google.privacy.dlp.v2. Value
;
import
com.google.pubsub.v1. ProjectSubscriptionName
;
import
com.google.pubsub.v1. ProjectTopicName
;
import
com.google.pubsub.v1. PubsubMessage
;
import
java.io.IOException
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
class
RiskAnalysisNumericalStats
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
String
datasetId
=
"your-bigquery-dataset-id"
;
String
tableId
=
"your-bigquery-table-id"
;
String
topicId
=
"pub-sub-topic"
;
String
subscriptionId
=
"pub-sub-subscription"
;
numericalStatsAnalysis
(
projectId
,
datasetId
,
tableId
,
topicId
,
subscriptionId
);
}
public
static
void
numericalStatsAnalysis
(
String
projectId
,
String
datasetId
,
String
tableId
,
String
topicId
,
String
subscriptionId
)
throws
ExecutionException
,
InterruptedException
,
IOException
{
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try
(
DlpServiceClient
dlpServiceClient
=
DlpServiceClient
.
create
())
{
// Specify the BigQuery table to analyze
BigQueryTable
bigQueryTable
=
BigQueryTable
.
newBuilder
()
.
setTableId
(
tableId
)
.
setDatasetId
(
datasetId
)
.
setProjectId
(
projectId
)
.
build
();
// This represents the name of the column to analyze, which must contain numerical data
String
columnName
=
"Age"
;
// Configure the privacy metric for the job
FieldId
fieldId
=
FieldId
.
newBuilder
().
setName
(
columnName
).
build
();
NumericalStatsConfig
numericalStatsConfig
=
NumericalStatsConfig
.
newBuilder
().
setField
(
fieldId
).
build
();
PrivacyMetric
privacyMetric
=
PrivacyMetric
.
newBuilder
().
setNumericalStatsConfig
(
numericalStatsConfig
).
build
();
// Create action to publish job status notifications over Google Cloud Pub/Sub
ProjectTopicName
topicName
=
ProjectTopicName
.
of
(
projectId
,
topicId
);
PublishToPubSub
publishToPubSub
=
PublishToPubSub
.
newBuilder
().
setTopic
(
topicName
.
toString
()).
build
();
Action
action
=
Action
.
newBuilder
().
setPubSub
(
publishToPubSub
).
build
();
// Configure the risk analysis job to perform
RiskAnalysisJobConfig
riskAnalysisJobConfig
=
RiskAnalysisJobConfig
.
newBuilder
()
.
setSourceTable
(
bigQueryTable
)
.
setPrivacyMetric
(
privacyMetric
)
.
addActions
(
action
)
.
build
();
CreateDlpJobRequest
createDlpJobRequest
=
CreateDlpJobRequest
.
newBuilder
()
.
setParent
(
LocationName
.
of
(
projectId
,
"global"
).
toString
())
.
setRiskJob
(
riskAnalysisJobConfig
)
.
build
();
// Send the request to the API using the client
DlpJob
dlpJob
=
dlpServiceClient
.
createDlpJob
(
createDlpJobRequest
);
// Set up a Pub/Sub subscriber to listen on the job completion status
final
SettableApiFuture<Boolean>
done
=
SettableApiFuture
.
create
();
ProjectSubscriptionName
subscriptionName
=
ProjectSubscriptionName
.
of
(
projectId
,
subscriptionId
);
MessageReceiver
messageHandler
=
(
PubsubMessage
pubsubMessage
,
AckReplyConsumer
ackReplyConsumer
)
-
>
{
handleMessage
(
dlpJob
,
done
,
pubsubMessage
,
ackReplyConsumer
);
};
Subscriber
subscriber
=
Subscriber
.
newBuilder
(
subscriptionName
,
messageHandler
).
build
();
subscriber
.
startAsync
();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try
{
done
.
get
(
15
,
TimeUnit
.
MINUTES
);
}
catch
(
TimeoutException
e
)
{
System
.
out
.
println
(
"Job was not completed after 15 minutes."
);
return
;
}
finally
{
subscriber
.
stopAsync
();
subscriber
.
awaitTerminated
();
}
// Build a request to get the completed job
GetDlpJobRequest
getDlpJobRequest
=
GetDlpJobRequest
.
newBuilder
().
setName
(
dlpJob
.
getName
()).
build
();
// Retrieve completed job status
DlpJob
completedJob
=
dlpServiceClient
.
getDlpJob
(
getDlpJobRequest
);
System
.
out
.
println
(
"Job status: "
+
completedJob
.
getState
());
System
.
out
.
println
(
"Job name: "
+
dlpJob
.
getName
());
// Get the result and parse through and process the information
NumericalStatsResult
result
=
completedJob
.
getRiskDetails
().
getNumericalStatsResult
();
System
.
out
.
printf
(
"Value range : [%.3f, %.3f]\n"
,
result
.
getMinValue
().
getFloatValue
(),
result
.
getMaxValue
().
getFloatValue
());
int
percent
=
1
;
Double
lastValue
=
null
;
for
(
Value
quantileValue
:
result
.
getQuantileValuesList
())
{
Double
currentValue
=
quantileValue
.
getFloatValue
();
if
(
lastValue
==
null
||
!
lastValue
.
equals
(
currentValue
))
{
System
.
out
.
printf
(
"Value at %s %% quantile : %.3f"
,
percent
,
currentValue
);
}
lastValue
=
currentValue
;
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private
static
void
handleMessage
(
DlpJob
job
,
SettableApiFuture<Boolean>
done
,
PubsubMessage
pubsubMessage
,
AckReplyConsumer
ackReplyConsumer
)
{
String
messageAttribute
=
pubsubMessage
.
getAttributesMap
().
get
(
"DlpJobName"
);
if
(
job
.
getName
().
equals
(
messageAttribute
))
{
done
.
set
(
true
);
ack
ReplyConsumer .
ack
();
}
else
{
ackReplyConsumer
.
nack
();
}
}
}
Node.js
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
// Import the Google Cloud client libraries
const
DLP
=
require
(
' @google-cloud/dlp
'
);
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
// Instantiates clients
const
dlp
=
new
DLP
.
DlpServiceClient
();
const
pubsub
=
new
PubSub
();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = 'my-project';
// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';
// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';
// The name of the column to compute risk metrics for, e.g. 'age'
// Note that this column must be a numeric data type
// const columnName = 'firstName';
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
async
function
numericalRiskAnalysis
()
{
const
sourceTable
=
{
projectId
:
tableProjectId
,
datasetId
:
datasetId
,
tableId
:
tableId
,
};
// Construct request for creating a risk analysis job
const
request
=
{
parent
:
`projects/
${
projectId
}
/locations/global`
,
riskJob
:
{
privacyMetric
:
{
numericalStatsConfig
:
{
field
:
{
name
:
columnName
,
},
},
},
sourceTable
:
sourceTable
,
actions
:
[
{
pubSub
:
{
topic
:
`projects/
${
projectId
}
/topics/
${
topicId
}
`
,
},
},
],
},
};
// Create helper function for unpacking values
const
getValue
=
obj
=
>
obj
[
Object
.
keys
(
obj
)[
0
]];
// Run risk analysis job
const
[
topicResponse
]
=
await
pubsub
.
topic
(
topicId
).
get
();
const
subscription
=
await
topicResponse
.
subscription
(
subscriptionId
);
const
[
jobsResponse
]
=
await
dlp
.
createDlpJob
(
request
);
const
jobName
=
jobsResponse
.
name
;
console
.
log
(
`Job created. Job name:
${
jobName
}
`
);
// Watch the Pub/Sub topic until the DLP job finishes
await
new
Promise
((
resolve
,
reject
)
=
>
{
const
messageHandler
=
message
=
>
{
if
(
message
.
attributes
&&
message
.
attributes
.
DlpJobName
===
jobName
)
{
message
.
ack
();
subscription
.
removeListener
(
'message'
,
messageHandler
);
subscription
.
removeListener
(
'error'
,
errorHandler
);
resolve
(
jobName
);
}
else
{
message
.
nack
();
}
};
const
errorHandler
=
err
=
>
{
subscription
.
removeListener
(
'message'
,
messageHandler
);
subscription
.
removeListener
(
'error'
,
errorHandler
);
reject
(
err
);
};
subscripti on
.
on
(
'message'
,
messageHandler
);
subscripti on
.
on
(
'error'
,
errorHandler
);
});
setTimeout
(()
=
>
{
console
.
log
(
' Waiting for DLP job to fully complete'
);
},
500
);
const
[
job
]
=
await
dlp
.
getDlpJob
({
name
:
jobName
});
const
results
=
job
.
riskDetails
.
numericalStatsResult
;
console
.
log
(
`Value Range: [
${
getValue
(
results
.
minValue
)
}
,
${
getValue
(
results
.
maxValue
)
}
]`
);
// Print unique quantile values
let
tempValue
=
null
;
results
.
quantileValues
.
forEach
((
result
,
percent
)
=
>
{
const
value
=
getValue
(
result
);
// Only print new values
if
(
tempValue
!==
value
&&
!
(
tempValue
&&
tempValue
.
equals
&&
tempValue
.
equals
(
value
))
)
{
console
.
log
(
`Value at
${
percent
}
% quantile:
${
value
}
`
);
tempValue
=
value
;
}
});
}
await
numericalRiskAnalysis
();
PHP
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\PrivacyMetric\NumericalStatsConfig;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Computes risk metrics of a column of numbers in a Google BigQuery table.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $dataProjectId The project ID containing the target Datastore
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $datasetId The ID of the BigQuery dataset to inspect
* @param string $tableId The ID of the BigQuery table to inspect
* @param string $columnName The name of the column to compute risk metrics for, e.g. "age"
*/
function numerical_stats(
string $callingProjectId,
string $dataProjectId,
string $topicId,
string $subscriptionId,
string $datasetId,
string $tableId,
string $columnName
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Construct risk analysis config
$columnField = (new FieldId())
->setName($columnName);
$statsConfig = (new NumericalStatsConfig())
->setField($columnField);
$privacyMetric = (new PrivacyMetric())
->setNumericalStatsConfig($statsConfig);
// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
->setProjectId($dataProjectId)
->setDatasetId($datasetId)
->setTableId($tableId);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
->setPrivacyMetric($privacyMetric)
->setSourceTable($bigqueryTable)
->setActions([$action]);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setRiskJob($riskJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName'])
&& $message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Helper function to convert Protobuf values to strings
$valueToString = function ($value) {
$json = json_decode($value->serializeToJsonString(), true);
return array_shift($json);
};
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$results = $job->getRiskDetails()->getNumericalStatsResult();
printf(
'Value range: [%s, %s]' . PHP_EOL,
$valueToString($results->getMinValue()),
$valueToString($results->getMaxValue())
);
// Only print unique values
$lastValue = null;
foreach ($results->getQuantileValues() as $percent => $quantileValue) {
$value = $valueToString($quantileValue);
if ($value != $lastValue) {
printf('Value at %s quantile: %s' . PHP_EOL, $percent, $value);
$lastValue = $value;
}
}
break;
case JobState::FAILED:
printf('Job %s had errors:' . PHP_EOL, $job->getName());
$errors = $job->getErrors();
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state. Most likely, the job is either running or has not yet started.');
}
}
Python
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
concurrent.futures
import
google.cloud.dlp
import
google.cloud.pubsub
def
numerical_risk_analysis
(
project
:
str
,
table_project_id
:
str
,
dataset_id
:
str
,
table_id
:
str
,
column_name
:
str
,
topic_id
:
str
,
subscription_id
:
str
,
timeout
:
int
=
300
,
)
-
> None
:
"""Uses the Data Loss Prevention API to compute risk metrics of a column
of numerical data in a Google BigQuery table.
Args:
project: The Google Cloud project id to use as a parent resource.
table_project_id: The Google Cloud project id where the BigQuery table
is stored.
dataset_id: The id of the dataset to inspect.
table_id: The id of the table to inspect.
column_name: The name of the column to compute risk metrics for.
topic_id: The name of the Pub/Sub topic to notify once the job
completes.
subscription_id: The name of the Pub/Sub subscription to use when
listening for job completion notifications.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp
=
google
.
cloud
.
dlp_v2
.
DlpServiceClient
()
# Convert the project id into full resource ids.
topic
=
google
.
cloud
.
pubsub
.
PublisherClient
.
topic_path
(
project
,
topic_id
)
parent
=
f
"projects/
{
project
}
/locations/global"
# Location info of the BigQuery table.
source_table
=
{
"project_id"
:
table_project_id
,
"dataset_id"
:
dataset_id
,
"table_id"
:
table_id
,
}
# Tell the API where to send a notification when the job is complete.
actions
=
[{
"pub_sub"
:
{
"topic"
:
topic
}}]
# Configure risk analysis job
# Give the name of the numeric column to compute risk metrics for
risk_job
=
{
"privacy_metric"
:
{
"numerical_stats_config"
:
{
"field"
:
{
"name"
:
column_name
}}},
"source_table"
:
source_table
,
"actions"
:
actions
,
}
# Call API to start risk analysis job
operation
=
dlp
.
create_dlp_job
(
request
=
{
"parent"
:
parent
,
"risk_job"
:
risk_job
})
def
callback
(
message
:
google
.
cloud
.
pubsub_v1
.
subscriber
.
message
.
Message
)
-
> None
:
if
message
.
attributes
[
"DlpJobName"
]
==
operation
.
name
:
# This is the message we're looking for, so acknowledge it.
message
.
ack
()
# Now that the job is done, fetch the results and print them.
job
=
dlp
.
get_dlp_job
(
request
=
{
"name"
:
operation
.
name
})
print
(
f
"Job name:
{
job
.
name
}
"
)
results
=
job
.
risk_details
.
numerical_stats_result
print
(
"Value Range: [
{}
,
{}
]"
.
format
(
results
.
min_value
.
integer_value
,
results
.
max_value
.
integer_value
,
)
)
prev_value
=
None
for
percent
,
result
in
enumerate
(
results
.
quantile_values
):
value
=
result
.
integer_value
if
prev_value
!=
value
:
print
(
f
"Value at
{
percent
}
% quantile:
{
value
}
"
)
prev_value
=
value
subscription
.
set_result
(
None
)
else
:
# This is not the message we're looking for.
message
.
drop
()
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber
=
google
.
cloud
.
pubsub
.
SubscriberClient
()
subscription_path
=
subscriber
.
subscription_path
(
project
,
subscription_id
)
subscription
=
subscribe
r .
subscribe
(
subscription_path
,
callback
)
try
:
subscription
.
result
(
timeout
=
timeout
)
except
concurrent
.
futures
.
TimeoutError
:
print
(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
subscription
.
close
()
Compute categorical numerical statistics
You can compute categorical numerical statistics for the individual histogram buckets within a BigQuery column, including:
- Upper bound on value frequency within a given bucket
- Lower bound on value frequency within a given bucket
- Size of a given bucket
- A sample of value frequencies within a given bucket (maximum 20)
To calculate these values, you configure a DlpJob
,
setting the CategoricalStatsConfig
privacy metric to the name of the column to scan. When you run the job
,
Sensitive Data Protection computes statistics for the given column, returning
its results in the CategoricalStatsResult
object.
Code examples
Following is sample code in several languages that demonstrates how to use Sensitive Data Protection to calculate categorical statistics.
C#
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
using
Google.Api.Gax.ResourceNames
;
using
Google.Cloud.Dlp.V2
;
using
Google.Cloud.PubSub.V1
;
using
Newtonsoft.Json
;
using
System
;
using
System.Collections.Generic
;
using
System.Linq
;
using
System.Threading
;
using
System.Threading.Tasks
;
using
static
Google
.
Cloud
.
Dlp
.
V2
.
Action
.
Types
;
using
static
Google
.
Cloud
.
Dlp
.
V2
.
PrivacyMetric
.
Types
;
public
class
RiskAnalysisCreateCategoricalStats
{
public
static
AnalyzeDataSourceRiskDetails
.
Types
.
CategoricalStatsResult
CategoricalStats
(
string
callingProjectId
,
string
tableProjectId
,
string
datasetId
,
string
tableId
,
string
topicId
,
string
subscriptionId
,
string
columnName
)
{
var
dlp
=
DlpServiceClient
.
Create
();
// Construct + submit the job
var
config
=
new
RiskAnalysisJobConfig
{
PrivacyMetric
=
new
PrivacyMetric
{
CategoricalStatsConfig
=
new
CategoricalStatsConfig
()
{
Field
=
new
FieldId
{
Name
=
columnName
}
}
},
SourceTable
=
new
BigQueryTable
{
ProjectId
=
tableProjectId
,
DatasetId
=
datasetId
,
TableId
=
tableId
},
Actions
=
{
new
Google
.
Cloud
.
Dlp
.
V2
.
Action
{
PubSub
=
new
PublishToPubSub
{
Topic
=
$"projects/{callingProjectId}/topics/{topicId}"
}
}
}
};
var
submittedJob
=
dlp
.
CreateDlpJob
(
new
CreateDlpJobRequest
{
ParentAsProjectName
=
new
ProjectName
(
callingProjectId
),
RiskJob
=
config
});
// Listen to pub/sub for the job
var
subscriptionName
=
new
SubscriptionName
(
callingProjectId
,
subscriptionId
);
var
subscriber
=
SubscriberClient
.
CreateAsync
(
subscriptionName
).
Result
;
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
var
done
=
new
ManualResetEventSlim
(
false
);
subscriber
.
StartAsync
((
PubsubMessage
message
,
CancellationToken
cancel
)
=
>
{
if
(
message
.
Attributes
[
"DlpJobName"
]
==
submittedJob
.
Name
)
{
Thread
.
Sleep
(
500
);
// Wait for DLP API results to become consistent
done
.
Set
();
return
Task
.
FromResult
(
SubscriberClient
.
Reply
.
Ack
);
}
else
{
return
Task
.
FromResult
(
SubscriberClient
.
Reply
.
Nack
);
}
});
done
.
Wait
(
TimeSpan
.
FromMinutes
(
10
));
// 10 minute timeout; may not work for large jobs
subscriber
.
StopAsync
(
CancellationToken
.
None
).
Wait
();
// Process results
var
resultJob
=
dlp
.
GetDlpJob
(
new
GetDlpJobRequest
{
DlpJobName
=
DlpJobName
.
Parse
(
submittedJob
.
Name
)
});
var
result
=
resultJob
.
RiskDetails
.
CategoricalStatsResult
;
for
(
var
bucketIdx
=
0
;
bucketIdx
<
result
.
ValueFrequencyHistogramBuckets
.
Count
;
bucketIdx
++
)
{
var
bucket
=
result
.
ValueFrequencyHistogramBuckets
[
bucketIdx
];
Console
.
WriteLine
(
$"Bucket {bucketIdx}"
);
Console
.
WriteLine
(
$" Most common value occurs {bucket. ValueFrequencyUpperBound
} time(s)."
);
Console
.
WriteLine
(
$" Least common value occurs {bucket. ValueFrequencyLowerBound
} time(s)."
);
Console
.
WriteLine
(
$" {bucket.BucketSize} unique value(s) total."
);
foreach
(
var
bucketValue
in
bucket
.
BucketValues
)
{
// 'UnpackValue(x)' is a prettier version of 'x.toString()'
Console
.
WriteLine
(
$" Value {UnpackValue(bucketValue. Value
)} occurs {bucketValue.Count} time(s)."
);
}
}
return
result
;
}
public
static
string
UnpackValue
(
Value
protoValue
)
{
var
jsonValue
=
JsonConvert
.
DeserializeObject<Dictionary<string
,
object
>> (
protoValue
.
ToString
());
return
jsonValue
.
Values
.
ElementAt
(
0
).
ToString
();
}
}
Go
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
(
"context"
"fmt"
"io"
"time"
dlp
"cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// riskCategorical computes the categorical risk of the given data.
func
riskCategorical
(
w
io
.
Writer
,
projectID
,
dataProject
,
pubSubTopic
,
pubSubSub
,
datasetID
,
tableID
,
columnName
string
)
error
{
// projectID := "my-project-id"
// dataProject := "bigquery-public-data"
// pubSubTopic := "dlp-risk-sample-topic"
// pubSubSub := "dlp-risk-sample-sub"
// datasetID := "nhtsa_traffic_fatalities"
// tableID := "accident_2015"
// columnName := "state_number"
ctx
:=
context
.
Background
()
client
,
err
:=
dlp
.
NewClient
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"dlp.NewClient: %w"
,
err
)
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
err
}
defer
pubsubClient
.
Close
()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t
:=
pubsubClient
.
Topic
(
pubSubTopic
)
topicExists
,
err
:=
t
.
Exists
(
ctx
)
if
err
!=
nil
{
return
err
}
if
!
topicExists
{
if
t
,
err
=
pubsubClient
.
CreateTopic
(
ctx
,
pubSubTopic
);
err
!=
nil
{
return
err
}
}
// Create the Subscription if it doesn't exist.
s
:=
pubsubClient
.
Subscription
(
pubSubSub
)
subExists
,
err
:=
s
.
Exists
(
ctx
)
if
err
!=
nil
{
return
err
}
if
!
subExists
{
if
s
,
err
=
pubsubClient
.
CreateSubscription
(
ctx
,
pubSubSub
,
pubsub
.
SubscriptionConfig
{
Topic
:
t
});
err
!=
nil
{
return
err
}
}
// topic is the PubSub topic string where messages should be sent.
topic
:=
"projects/"
+
projectID
+
"/topics/"
+
pubSubTopic
// Create a configured request.
req
:=
& dlppb
.
CreateDlpJobRequest
{
Parent
:
fmt
.
Sprintf
(
"projects/%s/locations/global"
,
projectID
),
Job
:
& dlppb
.
CreateDlpJobRequest_RiskJob
{
RiskJob
:
& dlppb
.
RiskAnalysisJobConfig
{
// PrivacyMetric configures what to compute.
PrivacyMetric
:
& dlppb
.
PrivacyMetric
{
Type
:
& dlppb
.
PrivacyMetric_CategoricalStatsConfig_
{
CategoricalStatsConfig
:
& dlppb
.
PrivacyMetric_CategoricalStatsConfig
{
Field
:
& dlppb
.
FieldId
{
Name
:
columnName
,
},
},
},
},
// SourceTable describes where to find the data.
SourceTable
:
& dlppb
.
BigQueryTable
{
ProjectId
:
dataProject
,
DatasetId
:
datasetID
,
TableId
:
tableID
,
},
// Send a message to PubSub using Actions.
Actions
:
[]
*
dlppb
.
Action
{
{
Action
:
& dlppb
.
Action_PubSub
{
PubSub
:
& dlppb
.
Action_PublishToPubSub
{
Topic
:
topic
,
},
},
},
},
},
},
}
// Create the risk job.
j
,
err
:=
client
.
CreateDlpJob
(
ctx
,
req
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"CreateDlpJob: %w"
,
err
)
}
fmt
.
Fprintf
(
w
,
"Created job: %v\n"
,
j
.
GetName
())
// Wait for the risk job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
10
*
time
.
Minute
)
defer
cancel
()
err
=
s
.
Receive
(
ctx
,
func
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
{
// If this is the wrong job, do not process the result.
if
msg
.
Attributes
[
"DlpJobName"
]
!=
j
.
GetName
()
{
msg
.
Nack
()
return
}
msg
.
Ack
()
time
.
Sleep
(
500
*
time
.
Millisecond
)
resp
,
err
:=
client
.
GetDlpJob
(
ctx
,
& dlppb
.
GetDlpJobRequest
{
Name
:
j
.
GetName
(),
})
if
err
!=
nil
{
fmt
.
Fprintf
(
w
,
"GetDlpJob: %v"
,
err
)
return
}
h
:=
resp
.
GetRiskDetails
().
GetCategoricalStatsResult
().
GetValueFrequencyHistogramBuckets
()
for
i
,
b
:=
range
h
{
fmt
.
Fprintf
(
w
,
"Histogram bucket %v\n"
,
i
)
fmt
.
Fprintf
(
w
,
" Most common value occurs %v times\n"
,
b
.
GetValueFrequencyUpperBound
())
fmt
.
Fprintf
(
w
,
" Least common value occurs %v times\n"
,
b
.
GetValueFrequencyLowerBound
())
fmt
.
Fprintf
(
w
,
" %v unique values total\n"
,
b
.
GetBucketSize
())
for
_
,
v
:=
range
b
.
GetBucketValues
()
{
fmt
.
Fprintf
(
w
,
" Value %v occurs %v times\n"
,
v
.
GetValue
(),
v
.
GetCount
())
}
}
// Stop listening for more messages.
cancel
()
})
if
err
!=
nil
{
return
fmt
.
Errorf
(
"Receive: %w"
,
err
)
}
return
nil
}
Java
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
com.google.api.core. SettableApiFuture
;
import
com.google.cloud.dlp.v2. DlpServiceClient
;
import
com.google.cloud.pubsub.v1. AckReplyConsumer
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.cloud.pubsub.v1. Subscriber
;
import
com.google.privacy.dlp.v2. Action
;
import
com.google.privacy.dlp.v2. Action
. PublishToPubSub
;
import
com.google.privacy.dlp.v2. AnalyzeDataSourceRiskDetails
. CategoricalStatsResult
;
import
com.google.privacy.dlp.v2. AnalyzeDataSourceRiskDetails
. CategoricalStatsResult
. CategoricalStatsHistogramBucket
;
import
com.google.privacy.dlp.v2. BigQueryTable
;
import
com.google.privacy.dlp.v2. CreateDlpJobRequest
;
import
com.google.privacy.dlp.v2. DlpJob
;
import
com.google.privacy.dlp.v2. FieldId
;
import
com.google.privacy.dlp.v2. GetDlpJobRequest
;
import
com.google.privacy.dlp.v2. LocationName
;
import
com.google.privacy.dlp.v2. PrivacyMetric
;
import
com.google.privacy.dlp.v2. PrivacyMetric
. CategoricalStatsConfig
;
import
com.google.privacy.dlp.v2. RiskAnalysisJobConfig
;
import
com.google.privacy.dlp.v2. ValueFrequency
;
import
com.google.pubsub.v1. ProjectSubscriptionName
;
import
com.google.pubsub.v1. ProjectTopicName
;
import
com.google.pubsub.v1. PubsubMessage
;
import
java.io.IOException
;
import
java.util.List
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
class
RiskAnalysisCategoricalStats
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
String
datasetId
=
"your-bigquery-dataset-id"
;
String
tableId
=
"your-bigquery-table-id"
;
String
topicId
=
"pub-sub-topic"
;
String
subscriptionId
=
"pub-sub-subscription"
;
categoricalStatsAnalysis
(
projectId
,
datasetId
,
tableId
,
topicId
,
subscriptionId
);
}
public
static
void
categoricalStatsAnalysis
(
String
projectId
,
String
datasetId
,
String
tableId
,
String
topicId
,
String
subscriptionId
)
throws
ExecutionException
,
InterruptedException
,
IOException
{
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try
(
DlpServiceClient
dlpServiceClient
=
DlpServiceClient
.
create
())
{
// Specify the BigQuery table to analyze
BigQueryTable
bigQueryTable
=
BigQueryTable
.
newBuilder
()
.
setProjectId
(
projectId
)
.
setDatasetId
(
datasetId
)
.
setTableId
(
tableId
)
.
build
();
// The name of the column to analyze, which doesn't need to contain numerical data
String
columnName
=
"Mystery"
;
// Configure the privacy metric for the job
FieldId
fieldId
=
FieldId
.
newBuilder
().
setName
(
columnName
).
build
();
CategoricalStatsConfig
categoricalStatsConfig
=
CategoricalStatsConfig
.
newBuilder
().
setField
(
fieldId
).
build
();
PrivacyMetric
privacyMetric
=
PrivacyMetric
.
newBuilder
().
setCategoricalStatsConfig
(
categoricalStatsConfig
).
build
();
// Create action to publish job status notifications over Google Cloud Pub/Sub
ProjectTopicName
topicName
=
ProjectTopicName
.
of
(
projectId
,
topicId
);
PublishToPubSub
publishToPubSub
=
PublishToPubSub
.
newBuilder
().
setTopic
(
topicName
.
toString
()).
build
();
Action
action
=
Action
.
newBuilder
().
setPubSub
(
publishToPubSub
).
build
();
// Configure the risk analysis job to perform
RiskAnalysisJobConfig
riskAnalysisJobConfig
=
RiskAnalysisJobConfig
.
newBuilder
()
.
setSourceTable
(
bigQueryTable
)
.
setPrivacyMetric
(
privacyMetric
)
.
addActions
(
action
)
.
build
();
// Build the job creation request to be sent by the client
CreateDlpJobRequest
createDlpJobRequest
=
CreateDlpJobRequest
.
newBuilder
()
.
setParent
(
LocationName
.
of
(
projectId
,
"global"
).
toString
())
.
setRiskJob
(
riskAnalysisJobConfig
)
.
build
();
// Send the request to the API using the client
DlpJob
dlpJob
=
dlpServiceClient
.
createDlpJob
(
createDlpJobRequest
);
// Set up a Pub/Sub subscriber to listen on the job completion status
final
SettableApiFuture<Boolean>
done
=
SettableApiFuture
.
create
();
ProjectSubscriptionName
subscriptionName
=
ProjectSubscriptionName
.
of
(
projectId
,
subscriptionId
);
MessageReceiver
messageHandler
=
(
PubsubMessage
pubsubMessage
,
AckReplyConsumer
ackReplyConsumer
)
-
>
{
handleMessage
(
dlpJob
,
done
,
pubsubMessage
,
ackReplyConsumer
);
};
Subscriber
subscriber
=
Subscriber
.
newBuilder
(
subscriptionName
,
messageHandler
).
build
();
subscriber
.
startAsync
();
// Wait for job completion semi-synchronously
// For long jobs, consider using a truly asynchronous execution model such as Cloud Functions
try
{
done
.
get
(
15
,
TimeUnit
.
MINUTES
);
}
catch
(
TimeoutException
e
)
{
System
.
out
.
println
(
"Job was not completed after 15 minutes."
);
return
;
}
finally
{
subscriber
.
stopAsync
();
subscriber
.
awaitTerminated
();
}
// Build a request to get the completed job
GetDlpJobRequest
getDlpJobRequest
=
GetDlpJobRequest
.
newBuilder
().
setName
(
dlpJob
.
getName
()).
build
();
// Retrieve completed job status
DlpJob
completedJob
=
dlpServiceClient
.
getDlpJob
(
getDlpJobRequest
);
System
.
out
.
println
(
"Job status: "
+
completedJob
.
getState
());
System
.
out
.
println
(
"Job name: "
+
dlpJob
.
getName
());
// Get the result and parse through and process the information
CategoricalStatsResult
result
=
completedJob
.
getRiskDetails
().
getCategoricalStatsResult
();
List<CategoricalStatsHistogramBucket>
histogramBucketList
=
result
.
getValueFrequencyHistogramBucketsList
();
for
(
CategoricalStatsHistogramBucket
bucket
:
histogramBucketList
)
{
long
mostCommonFrequency
=
bucket
.
getValueFrequencyUpperBound
();
System
.
out
.
printf
(
"Most common value occurs %d time(s).\n"
,
mostCommonFrequency
);
long
leastCommonFrequency
=
bucket
.
getValueFrequencyLowerBound
();
System
.
out
.
printf
(
"Least common value occurs %d time(s).\n"
,
leastCommonFrequency
);
for
(
ValueFrequency
valueFrequency
:
bucket
.
getBucketValuesList
())
{
System
.
out
.
printf
(
"Value %s occurs %d time(s).\n"
,
valueFrequency
.
getValue
().
toString
(),
valueFrequency
.
getCount
());
}
}
}
}
// handleMessage injects the job and settableFuture into the message reciever interface
private
static
void
handleMessage
(
DlpJob
job
,
SettableApiFuture<Boolean>
done
,
PubsubMessage
pubsubMessage
,
AckReplyConsumer
ackReplyConsumer
)
{
String
messageAttribute
=
pubsubMessage
.
getAttributesMap
().
get
(
"DlpJobName"
);
if
(
job
.
getName
().
equals
(
messageAttribute
))
{
done
.
set
(
true
);
ack
ReplyConsumer .
ack
();
}
else
{
ackReplyConsumer
.
nack
();
}
}
}
Node.js
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
// Import the Google Cloud client libraries
const
DLP
=
require
(
' @google-cloud/dlp
'
);
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
// Instantiates clients
const
dlp
=
new
DLP
.
DlpServiceClient
();
const
pubsub
=
new
PubSub
();
// The project ID to run the API call under
// const projectId = 'my-project';
// The project ID the table is stored under
// This may or (for public datasets) may not equal the calling project ID
// const tableProjectId = 'my-project';
// The ID of the dataset to inspect, e.g. 'my_dataset'
// const datasetId = 'my_dataset';
// The ID of the table to inspect, e.g. 'my_table'
// const tableId = 'my_table';
// The name of the Pub/Sub topic to notify once the job completes
// TODO(developer): create a Pub/Sub topic to use for this
// const topicId = 'MY-PUBSUB-TOPIC'
// The name of the Pub/Sub subscription to use when listening for job
// completion notifications
// TODO(developer): create a Pub/Sub subscription to use for this
// const subscriptionId = 'MY-PUBSUB-SUBSCRIPTION'
// The name of the column to compute risk metrics for, e.g. 'firstName'
// const columnName = 'firstName';
async
function
categoricalRiskAnalysis
()
{
const
sourceTable
=
{
projectId
:
tableProjectId
,
datasetId
:
datasetId
,
tableId
:
tableId
,
};
// Construct request for creating a risk analysis job
const
request
=
{
parent
:
`projects/
${
projectId
}
/locations/global`
,
riskJob
:
{
privacyMetric
:
{
categoricalStatsConfig
:
{
field
:
{
name
:
columnName
,
},
},
},
sourceTable
:
sourceTable
,
actions
:
[
{
pubSub
:
{
topic
:
`projects/
${
projectId
}
/topics/
${
topicId
}
`
,
},
},
],
},
};
// Create helper function for unpacking values
const
getValue
=
obj
=
>
obj
[
Object
.
keys
(
obj
)[
0
]];
// Run risk analysis job
const
[
topicResponse
]
=
await
pubsub
.
topic
(
topicId
).
get
();
const
subscription
=
await
topicResponse
.
subscription
(
subscriptionId
);
const
[
jobsResponse
]
=
await
dlp
.
createDlpJob
(
request
);
const
jobName
=
jobsResponse
.
name
;
console
.
log
(
`Job created. Job name:
${
jobName
}
`
);
// Watch the Pub/Sub topic until the DLP job finishes
await
new
Promise
((
resolve
,
reject
)
=
>
{
const
messageHandler
=
message
=
>
{
if
(
message
.
attributes
&&
message
.
attributes
.
DlpJobName
===
jobName
)
{
message
.
ack
();
subscription
.
removeListener
(
'message'
,
messageHandler
);
subscription
.
removeListener
(
'error'
,
errorHandler
);
resolve
(
jobName
);
}
else
{
message
.
nack
();
}
};
const
errorHandler
=
err
=
>
{
subscription
.
removeListener
(
'message'
,
messageHandler
);
subscription
.
removeListener
(
'error'
,
errorHandler
);
reject
(
err
);
};
subscripti on
.
on
(
'message'
,
messageHandler
);
subscripti on
.
on
(
'error'
,
errorHandler
);
});
setTimeout
(()
=
>
{
console
.
log
(
' Waiting for DLP job to fully complete'
);
},
500
);
const
[
job
]
=
await
dlp
.
getDlpJob
({
name
:
jobName
});
const
histogramBuckets
=
job
.
riskDetails
.
categoricalStatsResult
.
valueFrequencyHistogramBuckets
;
histogramBuckets
.
forEach
((
histogramBucket
,
histogramBucketIdx
)
=
>
{
console
.
log
(
`Bucket
${
histogramBucketIdx
}
:`
);
// Print bucket stats
console
.
log
(
` Most common value occurs
${
histogramBucket
.
valueFrequencyUpperBound
}
time(s)`
);
console
.
log
(
` Least common value occurs
${
histogramBucket
.
valueFrequencyLowerBound
}
time(s)`
);
// Print bucket values
console
.
log
(
`
${
histogramBucket
.
bucketSize
}
unique values total.`
);
histogramBucket
.
bucketValues
.
forEach
(
valueBucket
=
>
{
console
.
log
(
` Value
${
getValue
(
valueBucket
.
value
)
}
occurs
${
valueBucket
.
count
}
time(s).`
);
});
});
}
await
categoricalRiskAnalysis
();
PHP
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
use Google\Cloud\Dlp\V2\Action;
use Google\Cloud\Dlp\V2\Action\PublishToPubSub;
use Google\Cloud\Dlp\V2\BigQueryTable;
use Google\Cloud\Dlp\V2\Client\DlpServiceClient;
use Google\Cloud\Dlp\V2\CreateDlpJobRequest;
use Google\Cloud\Dlp\V2\DlpJob\JobState;
use Google\Cloud\Dlp\V2\FieldId;
use Google\Cloud\Dlp\V2\GetDlpJobRequest;
use Google\Cloud\Dlp\V2\PrivacyMetric;
use Google\Cloud\Dlp\V2\PrivacyMetric\CategoricalStatsConfig;
use Google\Cloud\Dlp\V2\RiskAnalysisJobConfig;
use Google\Cloud\PubSub\PubSubClient;
/**
* Computes risk metrics of a column of data in a Google BigQuery table.
*
* @param string $callingProjectId The project ID to run the API call under
* @param string $dataProjectId The project ID containing the target Datastore
* @param string $topicId The name of the Pub/Sub topic to notify once the job completes
* @param string $subscriptionId The name of the Pub/Sub subscription to use when listening for job
* @param string $datasetId The ID of the dataset to inspect
* @param string $tableId The ID of the table to inspect
* @param string $columnName The name of the column to compute risk metrics for, e.g. "age"
*/
function categorical_stats(
string $callingProjectId,
string $dataProjectId,
string $topicId,
string $subscriptionId,
string $datasetId,
string $tableId,
string $columnName
): void {
// Instantiate a client.
$dlp = new DlpServiceClient();
$pubsub = new PubSubClient();
$topic = $pubsub->topic($topicId);
// Construct risk analysis config
$columnField = (new FieldId())
->setName($columnName);
$statsConfig = (new CategoricalStatsConfig())
->setField($columnField);
$privacyMetric = (new PrivacyMetric())
->setCategoricalStatsConfig($statsConfig);
// Construct items to be analyzed
$bigqueryTable = (new BigQueryTable())
->setProjectId($dataProjectId)
->setDatasetId($datasetId)
->setTableId($tableId);
// Construct the action to run when job completes
$pubSubAction = (new PublishToPubSub())
->setTopic($topic->name());
$action = (new Action())
->setPubSub($pubSubAction);
// Construct risk analysis job config to run
$riskJob = (new RiskAnalysisJobConfig())
->setPrivacyMetric($privacyMetric)
->setSourceTable($bigqueryTable)
->setActions([$action]);
// Submit request
$parent = "projects/$callingProjectId/locations/global";
$createDlpJobRequest = (new CreateDlpJobRequest())
->setParent($parent)
->setRiskJob($riskJob);
$job = $dlp->createDlpJob($createDlpJobRequest);
// Listen for job notifications via an existing topic/subscription.
$subscription = $topic->subscription($subscriptionId);
// Poll Pub/Sub using exponential backoff until job finishes
// Consider using an asynchronous execution model such as Cloud Functions
$attempt = 1;
$startTime = time();
do {
foreach ($subscription->pull() as $message) {
if (
isset($message->attributes()['DlpJobName'])
&& $message->attributes()['DlpJobName'] === $job->getName()
) {
$subscription->acknowledge($message);
// Get the updated job. Loop to avoid race condition with DLP API.
do {
$getDlpJobRequest = (new GetDlpJobRequest())
->setName($job->getName());
$job = $dlp->getDlpJob($getDlpJobRequest);
} while ($job->getState() == JobState::RUNNING);
break 2; // break from parent do while
}
}
print('Waiting for job to complete' . PHP_EOL);
// Exponential backoff with max delay of 60 seconds
sleep(min(60, pow(2, ++$attempt)));
} while (time() - $startTime < 600); // 10 minute timeout
// Print finding counts
printf('Job %s status: %s' . PHP_EOL, $job->getName(), JobState::name($job->getState()));
switch ($job->getState()) {
case JobState::DONE:
$histBuckets = $job->getRiskDetails()->getCategoricalStatsResult()->getValueFrequencyHistogramBuckets();
foreach ($histBuckets as $bucketIndex => $histBucket) {
// Print bucket stats
printf('Bucket %s:' . PHP_EOL, $bucketIndex);
printf(' Most common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyUpperBound());
printf(' Least common value occurs %s time(s)' . PHP_EOL, $histBucket->getValueFrequencyLowerBound());
printf(' %s unique value(s) total.', $histBucket->getBucketSize());
// Print bucket values
foreach ($histBucket->getBucketValues() as $percent => $quantile) {
printf(
' Value %s occurs %s time(s).' . PHP_EOL,
$quantile->getValue()->serializeToJsonString(),
$quantile->getCount()
);
}
}
break;
case JobState::FAILED:
$errors = $job->getErrors();
printf('Job %s had errors:' . PHP_EOL, $job->getName());
foreach ($errors as $error) {
var_dump($error->getDetails());
}
break;
case JobState::PENDING:
print('Job has not completed. Consider a longer timeout or an asynchronous execution model' . PHP_EOL);
break;
default:
print('Unexpected job state.');
}
}
Python
To learn how to install and use the client library for Sensitive Data Protection, see Sensitive Data Protection client libraries .
To authenticate to Sensitive Data Protection, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
concurrent.futures
import
google.cloud.dlp
import
google.cloud.pubsub
def
categorical_risk_analysis
(
project
:
str
,
table_project_id
:
str
,
dataset_id
:
str
,
table_id
:
str
,
column_name
:
str
,
topic_id
:
str
,
subscription_id
:
str
,
timeout
:
int
=
300
,
)
-
> None
:
"""Uses the Data Loss Prevention API to compute risk metrics of a column
of categorical data in a Google BigQuery table.
Args:
project: The Google Cloud project id to use as a parent resource.
table_project_id: The Google Cloud project id where the BigQuery table
is stored.
dataset_id: The id of the dataset to inspect.
table_id: The id of the table to inspect.
column_name: The name of the column to compute risk metrics for.
topic_id: The name of the Pub/Sub topic to notify once the job
completes.
subscription_id: The name of the Pub/Sub subscription to use when
listening for job completion notifications.
timeout: The number of seconds to wait for a response from the API.
Returns:
None; the response from the API is printed to the terminal.
"""
# Instantiate a client.
dlp
=
google
.
cloud
.
dlp_v2
.
DlpServiceClient
()
# Convert the project id into full resource ids.
topic
=
google
.
cloud
.
pubsub
.
PublisherClient
.
topic_path
(
project
,
topic_id
)
parent
=
f
"projects/
{
project
}
/locations/global"
# Location info of the BigQuery table.
source_table
=
{
"project_id"
:
table_project_id
,
"dataset_id"
:
dataset_id
,
"table_id"
:
table_id
,
}
# Tell the API where to send a notification when the job is complete.
actions
=
[{
"pub_sub"
:
{
"topic"
:
topic
}}]
# Configure risk analysis job
# Give the name of the numeric column to compute risk metrics for
risk_job
=
{
"privacy_metric"
:
{
"categorical_stats_config"
:
{
"field"
:
{
"name"
:
column_name
}}
},
"source_table"
:
source_table
,
"actions"
:
actions
,
}
# Call API to start risk analysis job
operation
=
dlp
.
create_dlp_job
(
request
=
{
"parent"
:
parent
,
"risk_job"
:
risk_job
})
def
callback
(
message
:
google
.
cloud
.
pubsub_v1
.
subscriber
.
message
.
Message
)
-
> None
:
if
message
.
attributes
[
"DlpJobName"
]
==
operation
.
name
:
# This is the message we're looking for, so acknowledge it.
message
.
ack
()
# Now that the job is done, fetch the results and print them.
job
=
dlp
.
get_dlp_job
(
request
=
{
"name"
:
operation
.
name
})
print
(
f
"Job name:
{
job
.
name
}
"
)
histogram_buckets
=
(
job
.
risk_details
.
categorical_stats_result
.
value_frequency_histogram_buckets
# noqa: E501
)
# Print bucket stats
for
i
,
bucket
in
enumerate
(
histogram_buckets
):
print
(
f
"Bucket
{
i
}
:"
)
print
(
" Most common value occurs
{}
time(s)"
.
format
(
bucket
.
value_frequency_upper_bound
)
)
print
(
" Least common value occurs
{}
time(s)"
.
format
(
bucket
.
value_frequency_lower_bound
)
)
print
(
f
"
{
bucket
.
bucket_size
}
unique values total."
)
for
value
in
bucket
.
bucket_values
:
print
(
" Value
{}
occurs
{}
time(s)"
.
format
(
value
.
value
.
integer_value
,
value
.
count
)
)
subscription
.
set_result
(
None
)
else
:
# This is not the message we're looking for.
message
.
drop
()
# Create a Pub/Sub client and find the subscription. The subscription is
# expected to already be listening to the topic.
subscriber
=
google
.
cloud
.
pubsub
.
SubscriberClient
()
subscription_path
=
subscriber
.
subscription_path
(
project
,
subscription_id
)
subscription
=
subscribe
r .
subscribe
(
subscription_path
,
callback
)
try
:
subscription
.
result
(
timeout
=
timeout
)
except
concurrent
.
futures
.
TimeoutError
:
print
(
"No event received before the timeout. Please verify that the "
"subscription provided is subscribed to the topic provided."
)
subscription
.
close
()