Demonstrates how to convert a Pub/Sub message into a notification message
Code sample
Go
To authenticate to Security Command Center, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
(
"bytes"
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/securitycenter/apiv1/securitycenterpb"
"github.com/golang/protobuf/jsonpb"
)
func
receiveMessages
(
w
io
.
Writer
,
projectID
string
,
subscriptionName
string
)
error
{
// projectID := "your-project-id"
// subsriptionName := "your-subscription-name"
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewClient: %w"
,
err
)
}
defer
client
.
Close
()
sub
:=
client
.
Subscription
(
subscriptionName
)
cctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
err
=
sub
.
Receive
(
cctx
,
func
(
ctx
context
.
Context
,
msg
*
pubsub
.
Message
)
{
var
notificationMessage
=
new
(
securitycenterpb
.
NotificationMessage
)
jsonpb
.
Unmarshal
(
bytes
.
NewReader
(
msg
.
Data
),
notificationMessage
)
fmt
.
Fprintln
(
w
,
"Got finding: "
,
notificationMessage
.
GetFinding
())
msg
.
Ack
()
cancel
()
})
if
err
!=
nil
{
return
fmt
.
Errorf
(
"Receive: %w"
,
err
)
}
return
nil
}
Java
To authenticate to Security Command Center, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
import
com.google.cloud.pubsub.v1. AckReplyConsumer
;
import
com.google.cloud.pubsub.v1. MessageReceiver
;
import
com.google.cloud.pubsub.v1. Subscriber
;
import
com.google.cloud.securitycenter.v1. NotificationMessage
;
import
com.google.protobuf. InvalidProtocolBufferException
;
import
com.google.protobuf.util. JsonFormat
;
import
com.google.pubsub.v1. ProjectSubscriptionName
;
import
com.google.pubsub.v1. PubsubMessage
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
public
class
NotificationReceiver
{
private
NotificationReceiver
()
{
}
public
static
void
receiveNotificationMessages
(
String
projectId
,
String
subscriptionId
)
{
// String projectId = "{your-project}";
// String subscriptionId = "{your-subscription}";
ProjectSubscriptionName
subscriptionName
=
ProjectSubscriptionName
.
of
(
projectId
,
subscriptionId
);
try
{
Subscriber
subscriber
=
Subscriber
.
newBuilder
(
subscriptionName
,
new
NotificationMessageReceiver
()).
build
();
subscriber
.
startAsync
().
awaitRunning
();
// This sets the timeout value of the subscriber to 10s.
subscriber
.
awaitTerminated
(
10_000
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
IllegalStateException
|
TimeoutException
e
)
{
System
.
out
.
println
(
"Subscriber stopped: "
+
e
);
}
}
static
class
NotificationMessageReceiver
implements
MessageReceiver
{
@Override
public
void
receiveMessage
(
PubsubMessage
message
,
AckReplyConsumer
consumer
)
{
NotificationMessage
.
Builder
notificationMessageBuilder
=
NotificationMessage
.
newBuilder
();
try
{
String
jsonString
=
message
.
getData
().
toStringUtf8
();
JsonFormat
.
parser
().
merge
(
jsonString
,
notificationMessageBuilder
);
NotificationMessage
notificationMessage
=
notificationMessageBuilder
.
build
();
System
.
out
.
println
(
String
.
format
(
"Config id: %s"
,
notificationMessage
.
getNotificationConfigName
()));
System
.
out
.
println
(
String
.
format
(
"Finding: %s"
,
notificationMessage
.
getFinding
()));
}
catch
(
InvalidProtocolBufferException
e
)
{
System
.
out
.
println
(
"Could not parse message: "
+
e
);
}
finally
{
consumer
.
ack
();
}
}
}
}
Node.js
To authenticate to Security Command Center, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
const
{
PubSub
}
=
require
(
' @google-cloud/pubsub
'
);
const
{
StringDecoder
}
=
require
(
'string_decoder'
);
// projectId = 'your-project-id'
// subscriptionId = 'your-subscription-id'
const
subscriptionName
=
'projects/'
+
projectId
+
'/subscriptions/'
+
subscriptionId
;
const
pubSubClient
=
new
PubSub
();
function
listenForMessages
()
{
const
subscription
=
pubSubClient
.
subscription
(
subscriptionName
);
// message.data is a buffer array of json
// 1. Convert buffer to normal string
// 2. Convert json to NotificationMessage object
const
messageHandler
=
message
=
>
{
const
jsonString
=
new
StringDecoder
(
'utf-8'
).
write
(
message
.
data
);
const
parsedNotificationMessage
=
JSON
.
parse
(
jsonString
);
console
.
log
(
parsedNotificationMessage
);
console
.
log
(
parsedNotificationMessage
.
finding
);
// ACK when done with message
message
.
ack
();
};
subscripti on
.
on
(
'message'
,
messageHandler
);
// Set timeout to 10 seconds
setTimeout
(()
=
>
{
subscription
.
removeListener
(
'message'
,
messageHandler
);
},
10000
);
}
await
listenForMessages
();
PHP
To authenticate to Security Command Center, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
use Google\Cloud\PubSub\PubSubClient;
/**
* @param string $projectId Your Cloud Project ID
* @param string $subscriptionId Your subscription ID
*/
function receive_notification(string $projectId, string $subscriptionId): void
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
foreach ($subscription->pull() as $message) {
printf('Message: %s' . PHP_EOL, $message->data());
// Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
$subscription->acknowledge($message);
}
}
Python
To authenticate to Security Command Center, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .
# Requires https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-python
import
concurrent
from
google.cloud
import
pubsub_v1
from
google.cloud.securitycenter_v1
import
NotificationMessage
from
google.protobuf.json_format
import
ParseError
# TODO: project_id = "your-project-id"
# TODO: subscription_name = "your-subscription-name"
def
callback
(
message
):
# Print the data received for debugging purpose if needed
print
(
f
"Received message:
{
message
.
data
}
"
)
try
:
notification_msg
=
NotificationMessage
.
from_json
(
message
.
data
)
print
(
"Notification config name: "
f
"
{
notification_msg
.
notification_config_name
}
"
)
print
(
f
"Finding:
{
notification_msg
.
finding
}
"
)
except
ParseError
:
print
(
"Could not parse received message as a NotificationMessage."
)
# Ack the message to prevent it from being pulled again
message
.
ack
()
subscriber
=
pubsub_v1
.
SubscriberClient
()
subscription_path
=
subscriber
.
subscription_path
(
project_id
,
subscription_name
)
streaming_pull_future
=
subscribe
r .
subscribe
(
subscription_path
,
callback
=
callback
)
print
(
f
"Listening for messages on
{
subscription_path
}
...
\n
"
)
try
:
streaming_pull_future
.
result
(
timeout
=
1
)
# Block for 1 second
except
concurrent
.
futures
.
TimeoutError
:
streaming_pull_future
.
cancel
()
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .