Publish messages of protobuf schema type

Publish messages that conform to a protocol buffer schema to a topic with a protocol buffer schema attached.

Explore further

For detailed documentation that includes this code sample, see the following:

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C++ API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 StatusOr 
 ; 
 []( 
 pubsub 
 :: 
 Publisher 
  
 publisher 
 ) 
  
 { 
  
 std 
 :: 
 vector<std 
 :: 
 pair<std 
 :: 
 string 
 , 
  
 std 
 :: 
 string 
>>  
 states 
 { 
  
 { 
 "New York" 
 , 
  
 "NY" 
 }, 
  
 { 
 "Pennsylvania" 
 , 
  
 "PA" 
 }, 
  
 }; 
  
 std 
 :: 
 vector<future<void> 
>  
 done 
 ; 
  
 auto 
  
 handler 
  
 = 
  
 []( 
 future<StatusOr<std 
 :: 
 string 
>>  
 f 
 ) 
  
 { 
  
 auto 
  
 id 
  
 = 
  
 f 
 . 
 get 
 (); 
  
 if 
  
 ( 
 ! 
 id 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 id 
 ). 
 status 
 (); 
  
 }; 
  
 for 
  
 ( 
 auto 
&  
 data 
  
 : 
  
 states 
 ) 
  
 { 
  
 google 
 :: 
 cloud 
 :: 
 pubsub 
 :: 
 samples 
 :: 
 State 
  
 state 
 ; 
  
 state 
 . 
 set_name 
 ( 
 data 
 . 
 first 
 ); 
  
 state 
 . 
 set_post_abbr 
 ( 
 data 
 . 
 second 
 ); 
  
 done 
 . 
 push_back 
 ( 
 publisher 
  
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 {} 
  
 . 
 SetData 
 ( 
 state 
 . 
 SerializeAsString 
 ()) 
  
 . 
 Build 
 ()) 
  
 . 
 then 
 ( 
 handler 
 )); 
  
 } 
  
 // Block until all messages are published. 
  
 for 
  
 ( 
 auto 
&  
 d 
  
 : 
  
 done 
 ) 
  
 d 
 . 
 get 
 (); 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub C# API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  using 
  
 Google.Cloud.PubSub.V1 
 ; 
 using 
  
 Google.Protobuf 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Generic 
 ; 
 using 
  
 System.Linq 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PublishProtoMessagesAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PublishProtoMessagesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 IEnumerable<Utilities 
 . 
 State 
>  
 messageStates 
 ) 
  
 { 
  
 TopicName 
  
 topicName 
  
 = 
  
 TopicName 
 . 
 FromProjectTopic 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 PublisherClient 
  
 publisher 
  
 = 
  
 await 
  
 PublisherClient 
 . 
 CreateAsync 
 ( 
 topicName 
 ); 
  
 PublisherServiceApiClient 
  
 publishApi 
  
 = 
  
 PublisherServiceApiClient 
 . 
 Create 
 (); 
  
 var 
  
 topic 
  
 = 
  
 publishApi 
 . 
 GetTopic 
 ( 
 topicName 
 ); 
  
 int 
  
 publishedMessageCount 
  
 = 
  
 0 
 ; 
  
 var 
  
 publishTasks 
  
 = 
  
 messageStates 
 . 
 Select 
 ( 
 async 
  
 state 
  
 = 
>  
 { 
  
 try 
  
 { 
  
 string 
  
 messageId 
  
 = 
  
 null 
 ; 
  
 switch 
  
 ( 
 topic 
 . 
 SchemaSettings 
 . 
 Encoding 
 ) 
  
 { 
  
 case 
  
 Encoding 
 . 
 Binary 
 : 
  
 var 
  
 binaryMessage 
  
 = 
  
 state 
 . 
 ToByteString 
 (); 
  
 messageId 
  
 = 
  
 await 
  
 publisher 
 . 
 PublishAsync 
 ( 
 binaryMessage 
 ); 
  
 break 
 ; 
  
 case 
  
 Encoding 
 . 
 Json 
 : 
  
 var 
  
 jsonMessage 
  
 = 
  
 JsonFormatter 
 . 
 Default 
 . 
 Format 
 ( 
 state 
 ); 
  
 messageId 
  
 = 
  
 await 
  
 publisher 
 . 
 PublishAsync 
 ( 
 jsonMessage 
 ); 
  
 break 
 ; 
  
 } 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Published message {messageId}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 publishedMessageCount 
 ); 
  
 } 
  
 catch 
  
 ( 
 Exception 
  
 exception 
 ) 
  
 { 
  
 Console 
 . 
 WriteLine 
 ( 
 $"An error occurred when publishing message {state}: {exception.Message}" 
 ); 
  
 } 
  
 }); 
  
 await 
  
 Task 
 . 
 WhenAll 
 ( 
 publishTasks 
 ); 
  
 // PublisherClient instance should be shutdown after use. 
  
 // The TimeSpan specifies for how long to attempt to publish locally queued messages. 
  
 await 
  
 publisher 
 . 
 ShutdownAsync 
 ( 
 TimeSpan 
 . 
 FromSeconds 
 ( 
 15 
 )); 
  
 return 
  
 publishedMessageCount 
 ; 
  
 } 
 } 
 

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub/v2" 
  
 "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" 
  
 statepb 
  
 "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas" 
  
 "google.golang.org/protobuf/encoding/protojson" 
  
 "google.golang.org/protobuf/proto" 
 ) 
 func 
  
 publishProtoMessages 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 state 
  
 := 
  
& statepb 
 . 
 State 
 { 
  
 Name 
 : 
  
 "Alaska" 
 , 
  
 PostAbbr 
 : 
  
 "AK" 
 , 
  
 } 
  
 // Get the topic encoding type. 
  
 req 
  
 := 
  
& pubsubpb 
 . 
 GetTopicRequest 
 { 
  
 Topic 
 : 
  
 fmt 
 . 
 Sprintf 
 ( 
 "projects/%s/topics/%s" 
 , 
  
 projectID 
 , 
  
 topicID 
 ), 
  
 } 
  
 t 
 , 
  
 err 
  
 := 
  
 client 
 . 
 TopicAdminClient 
 . 
 GetTopic 
 ( 
 ctx 
 , 
  
 req 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "got err in GetTopic: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 encoding 
  
 := 
  
 t 
 . 
 SchemaSettings 
 . 
 Encoding 
  
 var 
  
 msg 
  
 [] 
 byte 
  
 switch 
  
 encoding 
  
 { 
  
 case 
  
 pubsubpb 
 . 
 Encoding_BINARY 
 : 
  
 msg 
 , 
  
 err 
  
 = 
  
 proto 
 . 
 Marshal 
 ( 
 state 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "proto.Marshal err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 case 
  
 pubsubpb 
 . 
 Encoding_JSON 
 : 
  
 msg 
 , 
  
 err 
  
 = 
  
 protojson 
 . 
 Marshal 
 ( 
 state 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "protojson.Marshal err: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 default 
 : 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "invalid encoding: %v" 
 , 
  
 encoding 
 ) 
  
 } 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 msg 
 , 
  
 }) 
  
 _ 
 , 
  
 err 
  
 = 
  
 result 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "result.Get: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Published proto message with %#v encoding: %s\n" 
 , 
  
 encoding 
 , 
  
 string 
 ( 
 msg 
 )) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Java API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. TopicAdminClient 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.protobuf.util. JsonFormat 
 
 ; 
 import 
  
 com.google.pubsub.v1. Encoding 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 utilities.StateProto.State 
 ; 
 public 
  
 class 
 PublishProtobufMessagesExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 // Use a topic created with a proto schema. 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 publishProtobufMessagesExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 publishProtobufMessagesExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
  
 { 
  
  Encoding 
 
  
 encoding 
  
 = 
  
 null 
 ; 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 // Get the topic encoding type. 
  
 try 
  
 ( 
  TopicAdminClient 
 
  
 topicAdminClient 
  
 = 
  
  TopicAdminClient 
 
 . 
 create 
 ()) 
  
 { 
  
 encoding 
  
 = 
  
 topicAdminClient 
 . 
 getTopic 
 ( 
 topicName 
 ). 
 getSchemaSettings 
 (). 
 getEncoding 
 (); 
  
 } 
  
  Publisher 
 
  
 publisher 
  
 = 
  
 null 
 ; 
  
 // Instantiate a protoc-generated class defined in `us-states.proto`. 
  
 State 
  
 state 
  
 = 
  
 State 
 . 
 newBuilder 
 (). 
 setName 
 ( 
 "Alaska" 
 ). 
 setPostAbbr 
 ( 
 "AK" 
 ). 
 build 
 (); 
  
 block 
 : 
  
 try 
  
 { 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 newBuilder 
 ( 
 topicName 
 ). 
 build 
 (); 
  
  PubsubMessage 
 
 . 
 Builder 
  
 message 
  
 = 
  
  PubsubMessage 
 
 . 
 newBuilder 
 (); 
  
 // Prepare an appropriately formatted message based on topic encoding. 
  
 switch 
  
 ( 
 encoding 
 ) 
  
 { 
  
 case 
  
 BINARY 
 : 
  
 message 
 . 
  setData 
 
 ( 
 state 
 . 
 toByteString 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Publishing a BINARY-formatted message:\n" 
  
 + 
  
 message 
 ); 
  
 break 
 ; 
  
 case 
  
 JSON 
 : 
  
 String 
  
 jsonString 
  
 = 
  
  JsonFormat 
 
 . 
 printer 
 (). 
 omittingInsignificantWhitespace 
 (). 
 print 
 ( 
 state 
 ); 
  
 message 
 . 
  setData 
 
 ( 
  ByteString 
 
 . 
  copyFromUtf8 
 
 ( 
 jsonString 
 )); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Publishing a JSON-formatted message:\n" 
  
 + 
  
 message 
 ); 
  
 break 
 ; 
  
 default 
 : 
  
 break 
  
 block 
 ; 
  
 } 
  
 // Publish the message. 
  
 ApiFuture<String> 
  
 future 
  
 = 
  
  publish 
 
er . 
  publish 
 
 ( 
 message 
 . 
 build 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Published message ID: " 
  
 + 
  
 future 
 . 
 get 
 ()); 
  
 } 
  
 finally 
  
 { 
  
 if 
  
 ( 
 publisher 
  
 != 
  
 null 
 ) 
  
 { 
  
 publisher 
 . 
  shutdown 
 
 (); 
  
 publisher 
 . 
  awaitTermination 
 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub PHP API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  use Google\Cloud\PubSub\PubSubClient; 
 use Google\Cloud\PubSub\V1\Encoding; 
 use Utilities\StateProto; 
 /** 
 * Publish a message using a protocol buffer schema. 
 * 
 * Relies on a proto message of the following form: 
 * ``` 
 * syntax = "proto3"; 
 * 
 * package utilities; 
 * 
 * message StateProto { 
 *   string name = 1; 
 *   string post_abbr = 2; 
 * } 
 * ``` 
 * 
 * @param string $projectId 
 * @param string $topicId 
 * @return void 
 */ 
 function publish_proto_messages($projectId, $topicId) 
 { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $messageData = new StateProto([ 
 'name' => 'Alaska', 
 'post_abbr' => 'AK', 
 ]); 
 $topic = $pubsub->topic($topicId); 
 // get the encoding type. 
 $topicInfo = $topic->info(); 
 $encoding = ''; 
 if (isset($topicInfo['schemaSettings']['encoding'])) { 
 $encoding = $topicInfo['schemaSettings']['encoding']; 
 } 
 // if encoding is not set, we can't continue. 
 if ($encoding === '') { 
 printf('Topic %s does not have schema enabled', $topicId); 
 return; 
 } 
 // If you are using gRPC, encoding may be an integer corresponding to an 
 // enum value on Google\Cloud\PubSub\V1\Encoding. 
 if (!is_string($encoding)) { 
 $encoding = Encoding::name($encoding); 
 } 
 $encodedMessageData = ''; 
 if ($encoding == 'BINARY') { 
 // encode as protobuf binary. 
 $encodedMessageData = $messageData->serializeToString(); 
 } else { 
 // encode as JSON. 
 $encodedMessageData = $messageData->serializeToJsonString(); 
 } 
 $topic->publish(['data' => $encodedMessageData]); 
 printf('Published message with %s encoding', $encoding); 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 topic_admin 
  
 = 
  
 pubsub 
 . 
  topic_admin 
 
 publisher 
  
 = 
  
 pubsub 
 . 
  publisher 
 
  
 topic_id 
 state 
  
 = 
  
 Utilities 
 :: 
 StateProto 
 . 
  new 
 
  
 name 
 : 
  
 "Alaska" 
 , 
  
 post_abbr 
 : 
  
 "AK" 
 topic 
  
 = 
  
 topic_admin 
 . 
 get_topic 
  
 topic 
 : 
  
 pubsub 
 . 
 topic_path 
 ( 
 topic_id 
 ) 
 encoding 
  
 = 
  
 topic 
 . 
  schema_settings 
 
 . 
 encoding 
 case 
  
 encoding 
 when 
  
 :BINARY 
  
 publisher 
 . 
 publish 
  
 Utilities 
 :: 
 StateProto 
 . 
 encode 
 ( 
 state 
 ) 
  
 puts 
  
 "Published binary-encoded protobuf message." 
 when 
  
 :JSON 
  
 publisher 
 . 
 publish 
  
 Utilities 
 :: 
 StateProto 
 . 
 encode_json 
 ( 
 state 
 ) 
  
 puts 
  
 "Published JSON-encoded protobuf message." 
 else 
  
 raise 
  
 "No encoding specified in 
 #{ 
 topic 
 . 
 name 
 } 
 ." 
 end 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: