v1 Receive messages of Proto schema type (DEPRECATED)

(DEPRECATED) Receive messages of Proto schema type

Code sample

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Go API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "sync" 
  
 "time" 
  
 "cloud.google.com/go/pubsub" 
  
 statepb 
  
 "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas" 
  
 "google.golang.org/protobuf/encoding/protojson" 
  
 "google.golang.org/protobuf/proto" 
 ) 
 func 
  
 subscribeWithProtoSchema 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
 , 
  
 protoFile 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // subID := "my-sub" 
  
 // protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
  NewClient 
 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 // Create an instance of the message to be decoded (a single U.S. state). 
  
 state 
  
 := 
  
& statepb 
 . 
 State 
 {} 
  
 sub 
  
 := 
  
 client 
 . 
 Subscription 
 ( 
 subID 
 ) 
  
 ctx2 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 ) 
  
 defer 
  
 cancel 
 () 
  
 var 
  
 mu 
  
 sync 
 . 
 Mutex 
  
 sub 
 . 
 Receive 
 ( 
 ctx2 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 mu 
 . 
 Lock 
 () 
  
 defer 
  
 mu 
 . 
 Unlock 
 () 
  
 encoding 
  
 := 
  
 msg 
 . 
 Attributes 
 [ 
 "googclient_schemaencoding" 
 ] 
  
 if 
  
 encoding 
  
 == 
  
 "BINARY" 
  
 { 
  
 if 
  
 err 
  
 := 
  
 proto 
 . 
 Unmarshal 
 ( 
 msg 
 . 
 Data 
 , 
  
 state 
 ); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "proto.Unmarshal err: %v\n" 
 , 
  
 err 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Printf 
 ( 
 "Received a binary-encoded message:\n%#v\n" 
 , 
  
 state 
 ) 
  
 } 
  
 else 
  
 if 
  
 encoding 
  
 == 
  
 "JSON" 
  
 { 
  
 if 
  
 err 
  
 := 
  
 protojson 
 . 
 Unmarshal 
 ( 
 msg 
 . 
 Data 
 , 
  
 state 
 ); 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "proto.Unmarshal err: %v\n" 
 , 
  
 err 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Received a JSON-encoded message:\n%#v\n" 
 , 
  
 state 
 ) 
  
 } 
  
 else 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Unknown message type(%s), nacking\n" 
 , 
  
 encoding 
 ) 
  
 msg 
 . 
 Nack 
 () 
  
 return 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "%s is abbreviated as %s\n" 
 , 
  
 state 
 . 
 Name 
 , 
  
 state 
 . 
 PostAbbr 
 ) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 return 
  
 nil 
 } 
 

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries . For more information, see the Pub/Sub Ruby API reference documentation .

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  Pubsub 
 
 . 
 new 
 subscription 
  
 = 
  
 pubsub 
 . 
 subscription 
  
 subscription_id 
 subscriber 
  
 = 
  
 subscription 
 . 
 listen 
  
 do 
  
 | 
 received_message 
 | 
  
 encoding 
  
 = 
  
 received_message 
 . 
 attributes 
 [ 
 "googclient_schemaencoding" 
 ] 
  
 case 
  
 encoding 
  
 when 
  
 "BINARY" 
  
 state 
  
 = 
  
 Utilities 
 :: 
 StateProto 
 . 
 decode 
  
 received_message 
 . 
 data 
  
 puts 
  
 "Received a binary-encoded message: 
 \n 
 #{ 
 state 
 } 
 " 
  
 when 
  
 "JSON" 
  
 require 
  
 "json" 
  
 state 
  
 = 
  
 Utilities 
 :: 
 StateProto 
 . 
 decode_json 
  
 received_message 
 . 
 data 
  
 puts 
  
 "Received a JSON-encoded message: 
 \n 
 #{ 
 state 
 } 
 " 
  
 else 
  
 "Received a message with no encoding: 
 \n 
 #{ 
 received_message 
 . 
 message_id 
 } 
 " 
  
 end 
  
 received_message 
 . 
 acknowledge! 
 end 
 subscriber 
 . 
 start 
 # Let the main thread sleep for 60 seconds so the thread for listening 
 # messages does not quit 
 sleep 
  
 60 
 subscriber 
 . 
 stop 
 . 
 wait! 
 

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser .

Create a Mobile Website
View Site in Mobile | Classic
Share by: