This document shows you how to publish messages to a topic with a schema.
Before you begin
Before configuring the publish workflow, ensure you have completed the following tasks:
- Understand how Pub/Sub schemas work .
- Associate a schema with a topic .
Required roles
To get the permissions that
you need to publish messages to a topic,
ask your administrator to grant you the Pub/Sub Publisher
( roles/pubsub.publisher
)
IAM role on the topic.
For more information about granting roles, see Manage access to projects, folders, and organizations
.
You might also be able to get the required permissions through custom roles or other predefined roles .
You need additional permissions to create or update topics and subscriptions.Publish messages with schema
You can publish messages to a topic that is associated with a schema. You must encode the messages in the schema and format that you specified when you created the topic . A message matches the schema associated with the topic if it matches any of the schema's revision in the allowed range of revisions . Messages are evaluated against revisions in order from the most recent allowed revision until either a match is found or the oldest allowed revision is reached. Pub/Sub adds the following attributes to a message successfully published to a topic associated with a schema:
-
googclient_schemaname
: The name of the schema used for validation. -
googclient_schemaencoding
: The encoding of the message, either JSON or BINARY. -
googclient_schemarevisionid
: The revision ID of the schema used to parse and validate the message. Each revision has a unique revision ID associated with it. The revision ID is an auto-generated eight-character UUID.
When a message does not match any of the schema revisions allowed by
the topic, Pub/Sub returns an INVALID_ARGUMENT
error to the publish request.
Pub/Sub only evaluates messages against schema revisions at publish time. Committing a new new schema revision or changing the schema associated with a topic after publishing a message does not re-evaluate that message nor change any of the attached schema message attributes.
You can publish messages to a topic with an associated schema in a Google Cloud project using the Google Cloud console, the gcloud CLI, the Pub/Sub API, or the Cloud Client Libraries.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
Publish a sample message using the gcloud pubsub topics publish command.
gcloud pubsub topics publish TOPIC_ID \ --message = MESSAGE
Replace the following:
-
TOPIC_ID : Name of the topic that you already created.
-
MESSAGE : Message published to the topic. A sample message can be
{"name": "Alaska", "post_abbr": "AK"}
.
C++
Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .
Avro namespace
pubsub
=
::
google
::
cloud
::
pubsub
;
using
::
google
::
cloud
::
future
;
using
::
google
::
cloud
::
StatusOr
;
[](
pubsub
::
Publisher
publisher
)
{
auto
constexpr
kNewYork
=
R
"
js(
{ "name": "New York", "post_abbr": "NY" }
)js
"
;
auto
constexpr
kPennsylvania
=
R
"
js(
{ "name": "Pennsylvania", "post_abbr": "PA" }
)js
"
;
std
::
vector<future<void>
>
done
;
auto
handler
=
[](
future<StatusOr<std
::
string
>>
f
)
{
auto
id
=
f
.
get
();
if
(
!
id
)
throw
std
::
move
(
id
).
status
();
};
for
(
auto
const
*
data
:
{
kNewYork
,
kPennsylvania
})
{
done
.
push_back
(
publisher
.
Publish
(
pubsub
::
MessageBuilder
{}.
SetData
(
data
).
Build
())
.
then
(
handler
));
}
// Block until all messages are published.
for
(
auto
&
d
:
done
)
d
.
get
();
}
namespace
pubsub
=
::
google
::
cloud
::
pubsub
;
using
::
google
::
cloud
::
future
;
using
::
google
::
cloud
::
StatusOr
;
[](
pubsub
::
Publisher
publisher
)
{
std
::
vector<std
::
pair<std
::
string
,
std
::
string
>>
states
{
{
"New York"
,
"NY"
},
{
"Pennsylvania"
,
"PA"
},
};
std
::
vector<future<void>
>
done
;
auto
handler
=
[](
future<StatusOr<std
::
string
>>
f
)
{
auto
id
=
f
.
get
();
if
(
!
id
)
throw
std
::
move
(
id
).
status
();
};
for
(
auto
&
data
:
states
)
{
google
::
cloud
::
pubsub
::
samples
::
State
state
;
state
.
set_name
(
data
.
first
);
state
.
set_post_abbr
(
data
.
second
);
done
.
push_back
(
publisher
.
Publish
(
pubsub
::
MessageBuilder
{}
.
SetData
(
state
.
SerializeAsString
())
.
Build
())
.
then
(
handler
));
}
// Block until all messages are published.
for
(
auto
&
d
:
done
)
d
.
get
();
}
C#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .
Avro using
Avro.IO
;
using
Avro.Specific
;
using
Google.Cloud.PubSub.V1
;
using
System
;
using
System.Collections.Generic
;
using
System.IO
;
using
System.Linq
;
using
System.Threading
;
using
System.Threading.Tasks
;
public
class
PublishAvroMessagesAsyncSample
{
public
async
Task<int>
PublishAvroMessagesAsync
(
string
projectId
,
string
topicId
,
IEnumerable<AvroUtilities
.
State
>
messageStates
)
{
TopicName
topicName
=
TopicName
.
FromProjectTopic
(
projectId
,
topicId
);
PublisherClient
publisher
=
await
PublisherClient
.
CreateAsync
(
topicName
);
PublisherServiceApiClient
publishApi
=
PublisherServiceApiClient
.
Create
();
var
topic
=
publishApi
.
GetTopic
(
topicName
);
int
publishedMessageCount
=
0
;
var
publishTasks
=
messageStates
.
Select
(
async
state
=
>
{
try
{
string
messageId
=
null
;
switch
(
topic
.
SchemaSettings
.
Encoding
)
{
case
Encoding
.
Binary
:
using
(
var
ms
=
new
MemoryStream
())
{
var
encoder
=
new
BinaryEncoder
(
ms
);
var
writer
=
new
SpecificDefaultWriter
(
state
.
Schema
);
writer
.
Write
(
state
,
encoder
);
messageId
=
await
publisher
.
PublishAsync
(
ms
.
ToArray
());
}
break
;
case
Encoding
.
Json
:
var
jsonMessage
=
AvroUtilities
.
StateUtils
.
StateToJsonString
(
state
);
messageId
=
await
publisher
.
PublishAsync
(
jsonMessage
);
break
;
}
Console
.
WriteLine
(
$"Published message {messageId}"
);
Interlocked
.
Increment
(
ref
publishedMessageCount
);
}
catch
(
Exception
exception
)
{
Console
.
WriteLine
(
$"An error occurred when publishing message {state}: {exception.Message}"
);
}
});
await
Task
.
WhenAll
(
publishTasks
);
return
publishedMessageCount
;
}
}
using
Google.Cloud.PubSub.V1
;
using
Google.Protobuf
;
using
System
;
using
System.Collections.Generic
;
using
System.Linq
;
using
System.Threading
;
using
System.Threading.Tasks
;
public
class
PublishProtoMessagesAsyncSample
{
public
async
Task<int>
PublishProtoMessagesAsync
(
string
projectId
,
string
topicId
,
IEnumerable<Utilities
.
State
>
messageStates
)
{
TopicName
topicName
=
TopicName
.
FromProjectTopic
(
projectId
,
topicId
);
PublisherClient
publisher
=
await
PublisherClient
.
CreateAsync
(
topicName
);
PublisherServiceApiClient
publishApi
=
PublisherServiceApiClient
.
Create
();
var
topic
=
publishApi
.
GetTopic
(
topicName
);
int
publishedMessageCount
=
0
;
var
publishTasks
=
messageStates
.
Select
(
async
state
=
>
{
try
{
string
messageId
=
null
;
switch
(
topic
.
SchemaSettings
.
Encoding
)
{
case
Encoding
.
Binary
:
var
binaryMessage
=
state
.
ToByteString
();
messageId
=
await
publisher
.
PublishAsync
(
binaryMessage
);
break
;
case
Encoding
.
Json
:
var
jsonMessage
=
JsonFormatter
.
Default
.
Format
(
state
);
messageId
=
await
publisher
.
PublishAsync
(
jsonMessage
);
break
;
}
Console
.
WriteLine
(
$"Published message {messageId}"
);
Interlocked
.
Increment
(
ref
publishedMessageCount
);
}
catch
(
Exception
exception
)
{
Console
.
WriteLine
(
$"An error occurred when publishing message {state}: {exception.Message}"
);
}
});
await
Task
.
WhenAll
(
publishTasks
);
return
publishedMessageCount
;
}
}
Go
The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .
Avro import
(
"context"
"fmt"
"io"
"os"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
"github.com/linkedin/goavro/v2"
)
func
publishAvroRecords
(
w
io
.
Writer
,
projectID
,
topicID
,
avscFile
string
)
error
{
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewClient: %w"
,
err
)
}
avroSource
,
err
:=
os
.
ReadFile
(
avscFile
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"os.ReadFile err: %w"
,
err
)
}
codec
,
err
:=
goavro
.
NewCodec
(
string
(
avroSource
))
if
err
!=
nil
{
return
fmt
.
Errorf
(
"goavro.NewCodec err: %w"
,
err
)
}
record
:=
map
[
string
]
interface
{}{
"name"
:
"Alaska"
,
"post_abbr"
:
"AK"
}
// Get the topic encoding type.
req
:=
& pubsubpb
.
GetTopicRequest
{
Topic
:
fmt
.
Sprintf
(
"projects/%s/topics/%s"
,
projectID
,
topicID
),
}
t
,
err
:=
client
.
TopicAdminClient
.
GetTopic
(
ctx
,
req
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"got err in GetTopic: %w"
,
err
)
}
encoding
:=
t
.
SchemaSettings
.
Encoding
var
msg
[]
byte
switch
encoding
{
case
pubsubpb
.
Encoding_BINARY
:
msg
,
err
=
codec
.
BinaryFromNative
(
nil
,
record
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"codec.BinaryFromNative err: %w"
,
err
)
}
case
pubsubpb
.
Encoding_JSON
:
msg
,
err
=
codec
.
TextualFromNative
(
nil
,
record
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"codec.TextualFromNative err: %w"
,
err
)
}
default
:
return
fmt
.
Errorf
(
"invalid encoding: %v"
,
encoding
)
}
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
publisher
:=
client
.
Publisher
(
topicID
)
result
:=
publisher
.
Publish
(
ctx
,
& pubsub
.
Message
{
Data
:
msg
,
})
_
,
err
=
result
.
Get
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"result.Get: %w"
,
err
)
}
fmt
.
Fprintf
(
w
,
"Published avro record: %s\n"
,
string
(
msg
))
return
nil
}
import
(
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
statepb
"github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func
publishProtoMessages
(
w
io
.
Writer
,
projectID
,
topicID
string
)
error
{
// projectID := "my-project-id"
// topicID := "my-topic"
ctx
:=
context
.
Background
()
client
,
err
:=
pubsub
.
NewClient
(
ctx
,
projectID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"pubsub.NewClient: %w"
,
err
)
}
state
:=
& statepb
.
State
{
Name
:
"Alaska"
,
PostAbbr
:
"AK"
,
}
// Get the topic encoding type.
req
:=
& pubsubpb
.
GetTopicRequest
{
Topic
:
fmt
.
Sprintf
(
"projects/%s/topics/%s"
,
projectID
,
topicID
),
}
t
,
err
:=
client
.
TopicAdminClient
.
GetTopic
(
ctx
,
req
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"got err in GetTopic: %w"
,
err
)
}
encoding
:=
t
.
SchemaSettings
.
Encoding
var
msg
[]
byte
switch
encoding
{
case
pubsubpb
.
Encoding_BINARY
:
msg
,
err
=
proto
.
Marshal
(
state
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"proto.Marshal err: %w"
,
err
)
}
case
pubsubpb
.
Encoding_JSON
:
msg
,
err
=
protojson
.
Marshal
(
state
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"protojson.Marshal err: %w"
,
err
)
}
default
:
return
fmt
.
Errorf
(
"invalid encoding: %v"
,
encoding
)
}
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
publisher
:=
client
.
Publisher
(
topicID
)
result
:=
publisher
.
Publish
(
ctx
,
& pubsub
.
Message
{
Data
:
msg
,
})
_
,
err
=
result
.
Get
(
ctx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"result.Get: %w"
,
err
)
}
fmt
.
Fprintf
(
w
,
"Published proto message with %#v encoding: %s\n"
,
encoding
,
string
(
msg
))
return
nil
}
Java
Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .
Avro import
com.google.api.core. ApiFuture
;
import
com.google.cloud.pubsub.v1. Publisher
;
import
com.google.cloud.pubsub.v1. TopicAdminClient
;
import
com.google.protobuf. ByteString
;
import
com.google.pubsub.v1. Encoding
;
import
com.google.pubsub.v1. PubsubMessage
;
import
com.google.pubsub.v1. TopicName
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.avro.io.Encoder
;
import
org.apache.avro.io.EncoderFactory
;
import
utilities.State
;
public
class
PublishAvroRecordsExample
{
public
static
void
main
(
String
...
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
// Use a topic created with an Avro schema.
String
topicId
=
"your-topic-id"
;
publishAvroRecordsExample
(
projectId
,
topicId
);
}
public
static
void
publishAvroRecordsExample
(
String
projectId
,
String
topicId
)
throws
IOException
,
ExecutionException
,
InterruptedException
{
Encoding
encoding
=
null
;
TopicName
topicName
=
TopicName
.
of
(
projectId
,
topicId
);
// Get the topic encoding type.
try
(
TopicAdminClient
topicAdminClient
=
TopicAdminClient
.
create
())
{
encoding
=
topicAdminClient
.
getTopic
(
topicName
).
getSchemaSettings
().
getEncoding
();
}
// Instantiate an avro-tools-generated class defined in `us-states.avsc`.
State
state
=
State
.
newBuilder
().
setName
(
"Alaska"
).
setPostAbbr
(
"AK"
).
build
();
Publisher
publisher
=
null
;
block
:
try
{
publisher
=
Publisher
.
newBuilder
(
topicName
).
build
();
// Prepare to serialize the object to the output stream.
ByteArrayOutputStream
byteStream
=
new
ByteArrayOutputStream
();
Encoder
encoder
=
null
;
// Prepare an appropriate encoder for publishing to the topic.
switch
(
encoding
)
{
case
BINARY
:
System
.
out
.
println
(
"Preparing a BINARY encoder..."
);
encoder
=
EncoderFactory
.
get
().
directBinaryEncoder
(
byteStream
,
/* reuse= */
null
);
break
;
case
JSON
:
System
.
out
.
println
(
"Preparing a JSON encoder..."
);
encoder
=
EncoderFactory
.
get
().
jsonEncoder
(
State
.
getClassSchema
(),
byteStream
);
break
;
default
:
break
block
;
}
// Encode the object and write it to the output stream.
state
.
customEncode
(
encoder
);
encoder
.
flush
();
// Publish the encoded object as a Pub/Sub message.
ByteString
data
=
ByteString
.
copyFrom
(
byteStream
.
toByteArray
());
PubsubMessage
message
=
PubsubMessage
.
newBuilder
().
setData
(
data
).
build
();
System
.
out
.
println
(
"Publishing message: "
+
message
);
ApiFuture<String>
future
=
publish
er .
publish
(
message
);
System
.
out
.
println
(
"Published message ID: "
+
future
.
get
());
}
finally
{
if
(
publisher
!=
null
)
{
publisher
.
shutdown
();
publisher
.
awaitTermination
(
1
,
TimeUnit
.
MINUTES
);
}
}
}
}
import
com.google.api.core. ApiFuture
;
import
com.google.cloud.pubsub.v1. Publisher
;
import
com.google.cloud.pubsub.v1. TopicAdminClient
;
import
com.google.protobuf. ByteString
;
import
com.google.protobuf.util. JsonFormat
;
import
com.google.pubsub.v1. Encoding
;
import
com.google.pubsub.v1. PubsubMessage
;
import
com.google.pubsub.v1. TopicName
;
import
java.io.IOException
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeUnit
;
import
utilities.StateProto.State
;
public
class
PublishProtobufMessagesExample
{
public
static
void
main
(
String
...
args
)
throws
Exception
{
// TODO(developer): Replace these variables before running the sample.
String
projectId
=
"your-project-id"
;
// Use a topic created with a proto schema.
String
topicId
=
"your-topic-id"
;
publishProtobufMessagesExample
(
projectId
,
topicId
);
}
public
static
void
publishProtobufMessagesExample
(
String
projectId
,
String
topicId
)
throws
IOException
,
ExecutionException
,
InterruptedException
{
Encoding
encoding
=
null
;
TopicName
topicName
=
TopicName
.
of
(
projectId
,
topicId
);
// Get the topic encoding type.
try
(
TopicAdminClient
topicAdminClient
=
TopicAdminClient
.
create
())
{
encoding
=
topicAdminClient
.
getTopic
(
topicName
).
getSchemaSettings
().
getEncoding
();
}
Publisher
publisher
=
null
;
// Instantiate a protoc-generated class defined in `us-states.proto`.
State
state
=
State
.
newBuilder
().
setName
(
"Alaska"
).
setPostAbbr
(
"AK"
).
build
();
block
:
try
{
publisher
=
Publisher
.
newBuilder
(
topicName
).
build
();
PubsubMessage
.
Builder
message
=
PubsubMessage
.
newBuilder
();
// Prepare an appropriately formatted message based on topic encoding.
switch
(
encoding
)
{
case
BINARY
:
message
.
setData
(
state
.
toByteString
());
System
.
out
.
println
(
"Publishing a BINARY-formatted message:\n"
+
message
);
break
;
case
JSON
:
String
jsonString
=
JsonFormat
.
printer
().
omittingInsignificantWhitespace
().
print
(
state
);
message
.
setData
(
ByteString
.
copyFromUtf8
(
jsonString
));
System
.
out
.
println
(
"Publishing a JSON-formatted message:\n"
+
message
);
break
;
default
:
break
block
;
}
// Publish the message.
ApiFuture<String>
future
=
publish
er .
publish
(
message
.
build
());
System
.
out
.
println
(
"Published message ID: "
+
future
.
get
());
}
finally
{
if
(
publisher
!=
null
)
{
publisher
.
shutdown
();
publisher
.
awaitTermination
(
1
,
TimeUnit
.
MINUTES
);
}
}
}
}
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .
Avro /**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const
{
PubSub
,
Encodings
}
=
require
(
' @google-cloud/pubsub
'
);
// And the Apache Avro library
const
avro
=
require
(
'avro-js'
);
const
fs
=
require
(
'fs'
);
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
async
function
publishAvroRecords
(
topicNameOrId
)
{
// Cache topic objects (publishers) and reuse them.
const
topic
=
pubSubClient
.
topic
(
topicNameOrId
);
// Get the topic metadata to learn about its schema encoding.
const
[
topicMetadata
]
=
await
topic
.
getMetadata
();
const
topicSchemaMetadata
=
topicMetadata
.
schemaSettings
;
if
(
!
topicSchemaMetadata
)
{
console
.
log
(
`Topic
${
topicNameOrId
}
doesn't seem to have a schema.`
);
return
;
}
const
schemaEncoding
=
topicSchemaMetadata
.
encoding
;
// Make an encoder using the official avro-js library.
const
definition
=
fs
.
readFileSync
(
'system-test/fixtures/provinces.avsc'
)
.
toString
();
const
type
=
avro
.
parse
(
definition
);
// Encode the message.
const
province
=
{
name
:
'Ontario'
,
post_abbr
:
'ON'
,
};
let
dataBuffer
;
switch
(
schemaEncoding
)
{
case
Encodings
.
Binary
:
dataBuffer
=
type
.
toBuffer
(
province
);
break
;
case
Encodings
.
Json
:
dataBuffer
=
Buffer
.
from
(
type
.
toString
(
province
));
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaEncoding
}
`
);
break
;
}
if
(
!
dataBuffer
)
{
console
.
log
(
`Invalid encoding
${
schemaEncoding
}
on the topic.`
);
return
;
}
const
messageId
=
await
topic
.
publish
(
dataBuffer
);
console
.
log
(
`Avro record
${
messageId
}
published.`
);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const
{
PubSub
,
Encodings
}
=
require
(
' @google-cloud/pubsub
'
);
// And the protobufjs library
const
protobuf
=
require
(
'protobufjs'
);
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
async
function
publishProtobufMessages
(
topicNameOrId
)
{
// Cache topic objects (publishers) and reuse them.
const
topic
=
pubSubClient
.
topic
(
topicNameOrId
);
// Get the topic metadata to learn about its schema.
const
[
topicMetadata
]
=
await
topic
.
getMetadata
();
const
topicSchemaMetadata
=
topicMetadata
.
schemaSettings
;
if
(
!
topicSchemaMetadata
)
{
console
.
log
(
`Topic
${
topicNameOrId
}
doesn't seem to have a schema.`
);
return
;
}
const
schemaEncoding
=
topicSchemaMetadata
.
encoding
;
// Encode the message.
const
province
=
{
name
:
'Ontario'
,
postAbbr
:
'ON'
,
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const
root
=
await
protobuf
.
load
(
'system-test/fixtures/provinces.proto'
);
const
Province
=
root
.
lookupType
(
'utilities.Province'
);
const
message
=
Province
.
create
(
province
);
let
dataBuffer
;
switch
(
schemaEncoding
)
{
case
Encodings
.
Binary
:
dataBuffer
=
Buffer
.
from
(
Province
.
encode
(
message
).
finish
());
break
;
case
Encodings
.
Json
:
dataBuffer
=
Buffer
.
from
(
JSON
.
stringify
(
message
.
toJSON
()));
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaEncoding
}
`
);
break
;
}
if
(
!
dataBuffer
)
{
console
.
log
(
`Invalid encoding
${
schemaEncoding
}
on the topic.`
);
return
;
}
const
messageId
=
await
topic
.
publishMessage
({
data
:
dataBuffer
});
console
.
log
(
`Protobuf message
${
messageId
}
published.`
);
}
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .
Avro /**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import
{
PubSub
,
Encodings
}
from
'@google-cloud/pubsub'
;
// And the Apache Avro library
import
*
as
avro
from
'avro-js'
;
import
*
as
fs
from
'fs'
;
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
interface
ProvinceObject
{
name
:
string
;
post_abbr
:
string
;
}
async
function
publishAvroRecords
(
topicNameOrId
:
string
)
{
// Cache topic objects (publishers) and reuse them.
const
topic
=
pubSubClient
.
topic
(
topicNameOrId
);
// Get the topic metadata to learn about its schema encoding.
const
[
topicMetadata
]
=
await
topic
.
getMetadata
();
const
topicSchemaMetadata
=
topicMetadata
.
schemaSettings
;
if
(
!
topicSchemaMetadata
)
{
console
.
log
(
`Topic
${
topicNameOrId
}
doesn't seem to have a schema.`
);
return
;
}
const
schemaEncoding
=
topicSchemaMetadata
.
encoding
;
// Make an encoder using the official avro-js library.
const
definition
=
fs
.
readFileSync
(
'system-test/fixtures/provinces.avsc'
)
.
toString
();
const
type
=
avro
.
parse
(
definition
);
// Encode the message.
const
province
:
ProvinceObject
=
{
name
:
'Ontario'
,
post_abbr
:
'ON'
,
};
let
dataBuffer
:
Buffer
|
undefined
;
switch
(
schemaEncoding
)
{
case
Encodings
.
Binary
:
dataBuffer
=
type
.
toBuffer
(
province
);
break
;
case
Encodings
.
Json
:
dataBuffer
=
Buffer
.
from
(
type
.
toString
(
province
));
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaEncoding
}
`
);
break
;
}
if
(
!
dataBuffer
)
{
console
.
log
(
`Invalid encoding
${
schemaEncoding
}
on the topic.`
);
return
;
}
const
messageId
=
await
topic
.
publish
(
dataBuffer
);
console
.
log
(
`Avro record
${
messageId
}
published.`
);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import
{
PubSub
,
Encodings
}
from
'@google-cloud/pubsub'
;
// And the protobufjs library
import
*
as
protobuf
from
'protobufjs'
;
// Creates a client; cache this for further use
const
pubSubClient
=
new
PubSub
();
interface
ProvinceObject
{
name
:
string
;
postAbbr
:
string
;
}
async
function
publishProtobufMessages
(
topicNameOrId
:
string
)
{
// Cache topic objects (publishers) and reuse them.
const
topic
=
pubSubClient
.
topic
(
topicNameOrId
);
// Get the topic metadata to learn about its schema.
const
[
topicMetadata
]
=
await
topic
.
getMetadata
();
const
topicSchemaMetadata
=
topicMetadata
.
schemaSettings
;
if
(
!
topicSchemaMetadata
)
{
console
.
log
(
`Topic
${
topicNameOrId
}
doesn't seem to have a schema.`
);
return
;
}
const
schemaEncoding
=
topicSchemaMetadata
.
encoding
;
// Encode the message.
const
province
:
ProvinceObject
=
{
name
:
'Ontario'
,
postAbbr
:
'ON'
,
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const
root
=
await
protobuf
.
load
(
'system-test/fixtures/provinces.proto'
);
const
Province
=
root
.
lookupType
(
'utilities.Province'
);
const
message
=
Province
.
create
(
province
);
let
dataBuffer
:
Buffer
|
undefined
;
switch
(
schemaEncoding
)
{
case
Encodings
.
Binary
:
dataBuffer
=
Buffer
.
from
(
Province
.
encode
(
message
).
finish
());
break
;
case
Encodings
.
Json
:
dataBuffer
=
Buffer
.
from
(
JSON
.
stringify
(
message
.
toJSON
()));
break
;
default
:
console
.
log
(
`Unknown schema encoding:
${
schemaEncoding
}
`
);
break
;
}
if
(
!
dataBuffer
)
{
console
.
log
(
`Invalid encoding
${
schemaEncoding
}
on the topic.`
);
return
;
}
const
messageId
=
await
topic
.
publishMessage
({
data
:
dataBuffer
});
console
.
log
(
`Protobuf message
${
messageId
}
published.`
);
}
PHP
Before trying this sample, follow the PHP setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub PHP API reference documentation .
Avro use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;
/**
* Publish a message using an AVRO schema.
*
* This sample uses `wikimedia/avro` for AVRO encoding.
*
* @param string $projectId
* @param string $topicId
* @param string $definitionFile
*/
function publish_avro_records($projectId, $topicId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$definition = (string) file_get_contents($definitionFile);
$messageData = [
'name' => 'Alaska',
'post_abbr' => 'AK',
];
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as AVRO binary.
$io = new AvroStringIO();
$schema = AvroSchema::parse($definition);
$writer = new AvroIODatumWriter($schema);
$encoder = new AvroIOBinaryEncoder($io);
$writer->write($messageData, $encoder);
$encodedMessageData = $io->string();
} else {
// encode as JSON.
$encodedMessageData = json_encode($messageData);
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use Utilities\StateProto;
/**
* Publish a message using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $topicId
* @return void
*/
function publish_proto_messages($projectId, $topicId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$messageData = new StateProto([
'name' => 'Alaska',
'post_abbr' => 'AK',
]);
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as protobuf binary.
$encodedMessageData = $messageData->serializeToString();
} else {
// encode as JSON.
$encodedMessageData = $messageData->serializeToJsonString();
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .
Avro from
avro.io
import
BinaryEncoder
,
DatumWriter
import
avro.schema
as
schema
import
io
import
json
from
google.api_core.exceptions
import
NotFound
from
google.cloud.pubsub
import
PublisherClient
from
google.pubsub_v1.types
import
Encoding
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
publisher_client
=
PublisherClient
()
topic_path
=
publisher_client
.
topic_path
(
project_id
,
topic_id
)
# Prepare to write Avro records to the binary output stream.
with
open
(
avsc_file
,
"rb"
)
as
file
:
avro_schema
=
schema
.
parse
(
file
.
read
())
writer
=
DatumWriter
(
avro_schema
)
bout
=
io
.
BytesIO
()
# Prepare some data using a Python dictionary that matches the Avro schema
record
=
{
"name"
:
"Alaska"
,
"post_abbr"
:
"AK"
}
try
:
# Get the topic encoding type.
topic
=
publisher_client
.
get_topic
(
request
=
{
"topic"
:
topic_path
})
encoding
=
topic
.
schema_settings
.
encoding
# Encode the data according to the message serialization type.
if
encoding
==
Encoding
.
BINARY
:
encoder
=
BinaryEncoder
(
bout
)
writer
.
write
(
record
,
encoder
)
data
=
bout
.
getvalue
()
print
(
f
"Preparing a binary-encoded message:
\n
{
data
.
decode
()
}
"
)
elif
encoding
==
Encoding
.
JSON
:
data_str
=
json
.
dumps
(
record
)
print
(
f
"Preparing a JSON-encoded message:
\n
{
data_str
}
"
)
data
=
data_str
.
encode
(
"utf-8"
)
else
:
print
(
f
"No encoding specified in
{
topic_path
}
. Abort."
)
exit
(
0
)
future
=
publish
er_client .
publish
(
topic_path
,
data
)
print
(
f
"Published message ID:
{
future
.
result
()
}
"
)
except
NotFound
:
print
(
f
"
{
topic_id
}
not found."
)
from
google.api_core.exceptions
import
NotFound
from
google.cloud.pubsub
import
PublisherClient
from
google.protobuf.json_format
import
MessageToJson
from
google.pubsub_v1.types
import
Encoding
from
utilities
import
us_states_pb2
# type: ignore
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_client
=
PublisherClient
()
topic_path
=
publisher_client
.
topic_path
(
project_id
,
topic_id
)
try
:
# Get the topic encoding type.
topic
=
publisher_client
.
get_topic
(
request
=
{
"topic"
:
topic_path
})
encoding
=
topic
.
schema_settings
.
encoding
# Instantiate a protoc-generated class defined in `us-states.proto`.
state
=
us_states_pb2
.
StateProto
()
state
.
name
=
"Alaska"
state
.
post_abbr
=
"AK"
# Encode the data according to the message serialization type.
if
encoding
==
Encoding
.
BINARY
:
data
=
state
.
SerializeToString
()
print
(
f
"Preparing a binary-encoded message:
\n
{
data
}
"
)
elif
encoding
==
Encoding
.
JSON
:
json_object
=
MessageToJson
(
state
)
data
=
str
(
json_object
)
.
encode
(
"utf-8"
)
print
(
f
"Preparing a JSON-encoded message:
\n
{
data
}
"
)
else
:
print
(
f
"No encoding specified in
{
topic_path
}
. Abort."
)
exit
(
0
)
future
=
publish
er_client .
publish
(
topic_path
,
data
)
print
(
f
"Published message ID:
{
future
.
result
()
}
"
)
except
NotFound
:
print
(
f
"
{
topic_id
}
not found."
)
Ruby
The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .
Avro # topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub
=
Google
::
Cloud
::
PubSub
.
new
topic_admin
=
pubsub
.
topic_admin
publisher
=
pubsub
.
publisher
topic_id
record
=
{
"name"
=
>
"Alaska"
,
"post_abbr"
=
>
"AK"
}
topic
=
topic_admin
.
get_topic
topic
:
pubsub
.
topic_path
(
topic_id
)
encoding
=
topic
.
schema_settings
.
encoding
case
encoding
when
:BINARY
require
"avro"
avro_schema
=
Avro
::
Schema
.
parse
File
.
read
(
avsc_file
)
writer
=
Avro
::
IO
::
DatumWriter
.
new
avro_schema
buffer
=
StringIO
.
new
encoder
=
Avro
::
IO
::
BinaryEncoder
.
new
buffer
writer
.
write
record
,
encoder
publisher
.
publish
buffer
puts
"Published binary-encoded AVRO message."
when
:JSON
require
"json"
publisher
.
publish
record
.
to_json
puts
"Published JSON-encoded AVRO message."
else
raise
"No encoding specified in
#{
topic
.
name
}
."
end
# topic_id = "your-topic-id"
pubsub
=
Google
::
Cloud
::
PubSub
.
new
topic_admin
=
pubsub
.
topic_admin
publisher
=
pubsub
.
publisher
topic_id
state
=
Utilities
::
StateProto
.
new
name
:
"Alaska"
,
post_abbr
:
"AK"
topic
=
topic_admin
.
get_topic
topic
:
pubsub
.
topic_path
(
topic_id
)
encoding
=
topic
.
schema_settings
.
encoding
case
encoding
when
:BINARY
publisher
.
publish
Utilities
::
StateProto
.
encode
(
state
)
puts
"Published binary-encoded protobuf message."
when
:JSON
publisher
.
publish
Utilities
::
StateProto
.
encode_json
(
state
)
puts
"Published JSON-encoded protobuf message."
else
raise
"No encoding specified in
#{
topic
.
name
}
."
end
What's next
-
To restrict the locations in which Pub/Sub stores message data, see Restricting Pub/Sub resource locations .
-
To learn more about receiving messages, see Choose a subscription type .