Receive messages from a pull subscription

This document describes how to receive messages from a pull subscription. You can use the Google Cloud console, the Google Cloud CLI, the client library, or the Pub/Sub API to create a pull subscription.

Before you begin

Required roles and permissions

To get the permissions that you need to pull messages from subscriptions and manage them, ask your administrator to grant you the Pub/Sub Subscriber ( roles/pubsub.subscriber ) IAM role on the project. For more information about granting roles, see Manage access to projects, folders, and organizations .

This predefined role contains the permissions required to pull messages from subscriptions and manage them. To see the exact permissions that are required, expand the Required permissionssection:

Required permissions

The following permissions are required to pull messages from subscriptions and manage them:

  • Pull from a subscription: pubsub.subscriptions.consume
  • Create a subscription: pubsub.subscriptions.create
  • Delete a subscription: pubsub.subscriptions.delete
  • Get a subscription: pubsub.subscriptions.get
  • List a subscription: pubsub.subscriptions.list
  • Update a subscription: pubsub.subscriptions.update
  • Attach a subscription to a topic: pubsub.topics.attachSubscription
  • Get the IAM policy for a subscription: pubsub.subscriptions.getIamPolicy
  • Configure the IAM policy for a subscription: pubsub.subscriptions.setIamPolicy
  • Grant the consume messages from a subscription permission on the pull subscription: pubsub.subscriptions.consume

You might also be able to get these permissions with custom roles or other predefined roles .

Pull a message from a subscription

The following samples demonstrate how to pull a message from a subscription using either the StreamingPull API or the Pull API.

StreamingPull API

To use the StreamingPull API, you must use a client library.

The Google Cloud console and Google Cloud CLI don't support the StreamingPull API.

StreamingPull and high-level client library code samples

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 auto 
  
 sample 
  
 = 
  
 []( 
 pubsub 
 :: 
 Subscriber 
  
 subscriber 
 ) 
  
 { 
  
 return 
  
 subscriber 
 . 
 Subscribe 
 ( 
  
 [&]( 
 pubsub 
 :: 
 Message 
  
 const 
&  
 m 
 , 
  
 pubsub 
 :: 
 AckHandler 
  
 h 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Received message " 
 << 
 m 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 move 
 ( 
 h 
 ). 
 ack 
 (); 
  
 PleaseIgnoreThisSimplifiesTestingTheSamples 
 (); 
  
 }); 
 }; 
 

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PullMessagesAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PullMessagesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  SubscriberClient 
 
  
 subscriber 
  
 = 
  
 await 
  
  SubscriberClient 
 
 . 
  CreateAsync 
 
 ( 
 subscriptionName 
 ); 
  
 // SubscriberClient runs your message handle function on multiple 
  
 // threads to maximize throughput. 
  
 int 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 Task 
  
 startTask 
  
 = 
  
 subscriber 
 . 
  StartAsync 
 
 (( 
  PubsubMessage 
 
  
 message 
 , 
  
 CancellationToken 
  
 cancel 
 ) 
  
 = 
>  
 { 
  
 string 
  
 text 
  
 = 
  
 message 
 . 
 Data 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {message.MessageId}: {text}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 messageCount 
 ); 
  
 return 
  
 Task 
 . 
 FromResult 
 ( 
 acknowledge 
  
 ? 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Ack 
 
  
 : 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Nack 
 
 ); 
  
 }); 
  
 // Run for 5 seconds. 
  
 await 
  
 Task 
 . 
 Delay 
 ( 
 5000 
 ); 
  
 await 
  
 subscriber 
 . 
  StopAsync 
 
 ( 
 CancellationToken 
 . 
 None 
 ); 
  
 // Lets make sure that the start task finished successfully after the call to stop. 
  
 await 
  
 startTask 
 ; 
  
 return 
  
 messageCount 
 ; 
  
 } 
 } 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "sync/atomic" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 pullMsgs 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // subID := "my-sub" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Subscriber can be passed a subscription ID (e.g. "my-sub") or 
  
 // a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub"). 
  
 // If a subscription ID is provided, the project ID from the client is used. 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subID 
 ) 
  
 // Receive messages for 10 seconds, which simplifies testing. 
  
 // Comment this out in production, since `Receive` should 
  
 // be used as a long running operation. 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 ) 
  
 defer 
  
 cancel 
 () 
  
 var 
  
 received 
  
 int32 
  
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 _ 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Got message: %q\n" 
 , 
  
 string 
 ( 
 msg 
 . 
 Data 
 )) 
  
 atomic 
 . 
 AddInt32 
 ( 
& received 
 , 
  
 1 
 ) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "sub.Receive: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Received %d messages\n" 
 , 
  
 received 
 ) 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 SubscribeAsyncExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 subscribeAsyncExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeAsyncExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 ) 
  
 { 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 // Handle incoming message, then ack the received message. 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Id: " 
  
 + 
  
 message 
 . 
 getMessageId 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Data: " 
  
 + 
  
 message 
 . 
 getData 
 (). 
 toStringUtf8 
 ()); 
  
 consumer 
 . 
 ack 
 (); 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ). 
 build 
 (); 
  
 // Start the subscriber. 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 // Allow the subscriber to run for 30s unless an unrecoverable error occurs. 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 timeoutException 
 ) 
  
 { 
  
 // Shut down the subscriber after 30s. Stop receiving messages. 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // const timeout = 60; 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 function 
  
 listenForMessages 
 ( 
 subscriptionNameOrId 
 , 
  
 timeout 
 ) 
  
 { 
  
 // References an existing subscription; if you are unsure if the 
  
 // subscription will exist, try the optimisticSubscribe sample. 
  
 const 
  
 subscription 
  
 = 
  
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
  
 // Create an event handler to handle messages 
  
 let 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 const 
  
 messageHandler 
  
 = 
  
 message 
  
 = 
>  
 { 
  
 console 
 . 
 log 
 ( 
 `Received message 
 ${ 
 message 
 . 
 id 
 } 
 :` 
 ); 
  
 console 
 . 
 log 
 ( 
 `\tData: 
 ${ 
 message 
 . 
 data 
 } 
 ` 
 ); 
  
 console 
 . 
 log 
 ( 
 `\tAttributes: 
 ${ 
 message 
 . 
 attributes 
 } 
 ` 
 ); 
  
 messageCount 
  
 += 
  
 1 
 ; 
  
 // "Ack" (acknowledge receipt of) the message 
  
 message 
 . 
  ack 
 
 (); 
  
 }; 
  
 // Listen for new messages until timeout is hit 
  
 subscripti on 
 
 . 
  on 
 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 // Wait a while for the subscription to run. (Part of the sample only.) 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 console 
 . 
 log 
 ( 
 ` 
 ${ 
 messageCount 
 } 
 message(s) received.` 
 ); 
  
 }, 
  
 timeout 
  
 * 
  
 1000 
 ); 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // const timeout = 60; 
 // Imports the Google Cloud client library 
 import 
  
 { 
 PubSub 
 , 
  
 Message 
 } 
  
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 function 
  
 listenForMessages 
 ( 
 subscriptionNameOrId 
 : 
  
 string 
 , 
  
 timeout 
 : 
  
 number 
 ) 
  
 { 
  
 // References an existing subscription; if you are unsure if the 
  
 // subscription will exist, try the optimisticSubscribe sample. 
  
 const 
  
 subscription 
  
 = 
  
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
  
 // Create an event handler to handle messages 
  
 let 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 const 
  
 messageHandler 
  
 = 
  
 ( 
 message 
 : 
  
 Message 
 ) 
  
 = 
>  
 { 
  
 console 
 . 
 log 
 ( 
 `Received message 
 ${ 
 message 
 . 
 id 
 } 
 :` 
 ); 
  
 console 
 . 
 log 
 ( 
 `\tData: 
 ${ 
 message 
 . 
 data 
 } 
 ` 
 ); 
  
 console 
 . 
 log 
 ( 
 `\tAttributes: 
 ${ 
 message 
 . 
 attributes 
 } 
 ` 
 ); 
  
 messageCount 
  
 += 
  
 1 
 ; 
  
 // "Ack" (acknowledge receipt of) the message 
  
 message 
 . 
 ack 
 (); 
  
 }; 
  
 // Listen for new messages until timeout is hit 
  
 subscription 
 . 
 on 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 // Wait a while for the subscription to run. (Part of the sample only.) 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 console 
 . 
 log 
 ( 
 ` 
 ${ 
 messageCount 
 } 
 message(s) received.` 
 ); 
  
 }, 
  
 timeout 
  
 * 
  
 1000 
 ); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 concurrent.futures 
  
 import 
 TimeoutError 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 # Number of seconds the subscriber should listen for messages 
 # timeout = 5.0 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 # The `subscription_path` method creates a fully qualified identifier 
 # in the form `projects/{project_id}/subscriptions/{subscription_id}` 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 def 
  
 callback 
 ( 
 message 
 : 
 pubsub_v1 
 . 
 subscriber 
 . 
 message 
 . 
  Message 
 
 ) 
 - 
> None 
 : 
 print 
 ( 
 f 
 "Received 
 { 
 message 
 } 
 ." 
 ) 
 message 
 . 
  ack 
 
 () 
 streaming_pull_future 
 = 
  subscribe 
 
r . 
  subscribe 
 
 ( 
 subscription_path 
 , 
 callback 
 = 
 callback 
 ) 
 print 
 ( 
 f 
 "Listening for messages on 
 { 
 subscription_path 
 } 
 .. 
 \n 
 " 
 ) 
 # Wrap subscriber in a 'with' block to automatically call close() when done. 
 with 
 subscriber 
 : 
 try 
 : 
 # When `timeout` is not set, result() will block indefinitely, 
 # unless an exception is encountered first. 
 streaming_pull_future 
 . 
 result 
 ( 
 timeout 
 = 
 timeout 
 ) 
 except 
 TimeoutError 
 : 
 streaming_pull_future 
 . 
 cancel 
 () 
 # Trigger the shutdown. 
 streaming_pull_future 
 . 
 result 
 () 
 # Block until the shutdown is complete. 
 

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 listener 
  
 = 
  
 subscriber 
 . 
  listen 
 
  
 do 
  
 | 
 received_message 
 | 
  
 puts 
  
 "Received message: 
 #{ 
 received_message 
 . 
 data 
 } 
 " 
  
 received_message 
 . 
  acknowledge! 
 
 end 
 listener 
 . 
  start 
 
 # Let the main thread sleep for 60 seconds so the thread for listening 
 # messages does not quit 
 sleep 
  
 60 
 listener 
 . 
 stop 
 . 
 wait! 
 

Retrieve custom attributes using the high-level client library

The following samples show how to pull messages asynchronously and retrieve the custom attributes from the metadata.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 auto 
  
 sample 
  
 = 
  
 []( 
 pubsub 
 :: 
 Subscriber 
  
 subscriber 
 ) 
  
 { 
  
 return 
  
 subscriber 
 . 
 Subscribe 
 ( 
  
 [&]( 
 pubsub 
 :: 
 Message 
  
 const 
&  
 m 
 , 
  
 pubsub 
 :: 
 AckHandler 
  
 h 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Received message with attributes: 
 \n 
 " 
 ; 
  
 for 
  
 ( 
 auto 
&  
 kv 
  
 : 
  
 m 
 . 
 attributes 
 ()) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "  " 
 << 
 kv 
 . 
 first 
 << 
 ": " 
 << 
 kv 
 . 
 second 
 << 
 " 
 \n 
 " 
 ; 
  
 } 
  
 std 
 :: 
 move 
 ( 
 h 
 ). 
 ack 
 (); 
  
 PleaseIgnoreThisSimplifiesTestingTheSamples 
 (); 
  
 }); 
 }; 
 

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Generic 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PullMessagesWithCustomAttributesAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<List<PubsubMessage> 
>  
 PullMessagesWithCustomAttributesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  SubscriberClient 
 
  
 subscriber 
  
 = 
  
 await 
  
  SubscriberClient 
 
 . 
  CreateAsync 
 
 ( 
 subscriptionName 
 ); 
  
 var 
  
 messages 
  
 = 
  
 new 
  
 List<PubsubMessage> 
 (); 
  
 Task 
  
 startTask 
  
 = 
  
 subscriber 
 . 
  StartAsync 
 
 (( 
  PubsubMessage 
 
  
 message 
 , 
  
 CancellationToken 
  
 cancel 
 ) 
  
 = 
>  
 { 
  
 messages 
 . 
 Add 
 ( 
 message 
 ); 
  
 string 
  
 text 
  
 = 
  
 message 
 . 
 Data 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {message.MessageId}: {text}" 
 ); 
  
 if 
  
 ( 
 message 
 . 
 Attributes 
  
 != 
  
 null 
 ) 
  
 { 
  
 foreach 
  
 ( 
 var 
  
 attribute 
  
 in 
  
 message 
 . 
 Attributes 
 ) 
  
 { 
  
 Console 
 . 
 WriteLine 
 ( 
 $"{attribute.Key} = {attribute.Value}" 
 ); 
  
 } 
  
 } 
  
 return 
  
 Task 
 . 
 FromResult 
 ( 
 acknowledge 
  
 ? 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Ack 
 
  
 : 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Nack 
 
 ); 
  
 }); 
  
 // Run for 7 seconds. 
  
 await 
  
 Task 
 . 
 Delay 
 ( 
 7000 
 ); 
  
 await 
  
 subscriber 
 . 
  StopAsync 
 
 ( 
 CancellationToken 
 . 
 None 
 ); 
  
 // Lets make sure that the start task finished successfully after the call to stop. 
  
 await 
  
 startTask 
 ; 
  
 return 
  
 messages 
 ; 
  
 } 
 } 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 pullMsgsCustomAttributes 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // subID := "my-sub" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Subscriber can be passed a subscription ID (e.g. "my-sub") or 
  
 // a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub"). 
  
 // If a subscription ID is provided, the project ID from the client is used. 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subID 
 ) 
  
 // Receive messages for 10 seconds, which simplifies testing. 
  
 // Comment this out in production, since `Receive` should 
  
 // be used as a long running operation. 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 ) 
  
 defer 
  
 cancel 
 () 
  
 // Receive blocks until the context is cancelled or an error occurs. 
  
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 _ 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Got message :%q\n" 
 , 
  
 string 
 ( 
 msg 
 . 
 Data 
 )) 
  
 fmt 
 . 
 Fprintln 
 ( 
 w 
 , 
  
 "Attributes:" 
 ) 
  
 for 
  
 key 
 , 
  
 value 
  
 := 
  
 range 
  
 msg 
 . 
 Attributes 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "%s = %s\n" 
 , 
  
 key 
 , 
  
 value 
 ) 
  
 } 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "sub.Receive: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 SubscribeWithCustomAttributesExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 subscribeWithCustomAttributesExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeWithCustomAttributesExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 ) 
  
 { 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 // Handle incoming message, then ack the received message. 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Id: " 
  
 + 
  
 message 
 . 
 getMessageId 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Data: " 
  
 + 
  
 message 
 . 
 getData 
 (). 
 toStringUtf8 
 ()); 
  
 // Print message attributes. 
  
 message 
  
 . 
 getAttributesMap 
 () 
  
 . 
 forEach 
 (( 
 key 
 , 
  
 value 
 ) 
  
 - 
>  
 System 
 . 
 out 
 . 
 println 
 ( 
 key 
  
 + 
  
 " = " 
  
 + 
  
 value 
 )); 
  
 consumer 
 . 
 ack 
 (); 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ). 
 build 
 (); 
  
 // Start the subscriber. 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 // Allow the subscriber to run for 30s unless an unrecoverable error occurs. 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 timeoutException 
 ) 
  
 { 
  
 // Shut down the subscriber after 30s. Stop receiving messages. 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // const timeout = 60; 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 async 
  
 function 
  
 listenWithCustomAttributes 
 ( 
 subscriptionNameOrId 
 , 
  
 timeout 
 ) 
  
 { 
  
 // References an existing subscription, e.g. "my-subscription" 
  
 const 
  
 subscription 
  
 = 
  
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
  
 // Create an event handler to handle messages 
  
 const 
  
 messageHandler 
  
 = 
  
 message 
  
 = 
>  
 { 
  
 console 
 . 
 log 
 ( 
  
 `Received message: id 
 ${ 
 message 
 . 
 id 
 } 
 , data 
 ${ 
  
 message 
 . 
 data 
  
 } 
 , attributes: 
 ${ 
  JSON 
 
 . 
 stringify 
 ( 
 message 
 . 
 attributes 
 ) 
 } 
 ` 
 , 
  
 ); 
  
 // "Ack" (acknowledge receipt of) the message 
  
 message 
 . 
  ack 
 
 (); 
  
 }; 
  
 // Wait a while for the subscription to run. (Part of the sample only.) 
  
 subscripti on 
 
 . 
  on 
 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 }, 
  
 timeout 
  
 * 
  
 1000 
 ); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 concurrent.futures 
  
 import 
 TimeoutError 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 # Number of seconds the subscriber should listen for messages 
 # timeout = 5.0 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 def 
  
 callback 
 ( 
 message 
 : 
 pubsub_v1 
 . 
 subscriber 
 . 
 message 
 . 
  Message 
 
 ) 
 - 
> None 
 : 
 print 
 ( 
 f 
 "Received 
 { 
 message 
 . 
  data 
 
 !r} 
 ." 
 ) 
 if 
 message 
 . 
  attributes 
 
 : 
 print 
 ( 
 "Attributes:" 
 ) 
 for 
 key 
 in 
 message 
 . 
  attributes 
 
 : 
 value 
 = 
 message 
 . 
  attributes 
 
 . 
 get 
 ( 
 key 
 ) 
 print 
 ( 
 f 
 " 
 { 
 key 
 } 
 : 
 { 
 value 
 } 
 " 
 ) 
 message 
 . 
  ack 
 
 () 
 streaming_pull_future 
 = 
  subscribe 
 
r . 
  subscribe 
 
 ( 
 subscription_path 
 , 
 callback 
 = 
 callback 
 ) 
 print 
 ( 
 f 
 "Listening for messages on 
 { 
 subscription_path 
 } 
 .. 
 \n 
 " 
 ) 
 # Wrap subscriber in a 'with' block to automatically call close() when done. 
 with 
 subscriber 
 : 
 try 
 : 
 # When `timeout` is not set, result() will block indefinitely, 
 # unless an exception is encountered first. 
 streaming_pull_future 
 . 
 result 
 ( 
 timeout 
 = 
 timeout 
 ) 
 except 
 TimeoutError 
 : 
 streaming_pull_future 
 . 
 cancel 
 () 
 # Trigger the shutdown. 
 streaming_pull_future 
 . 
 result 
 () 
 # Block until the shutdown is complete. 
 

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 listener 
  
 = 
  
 subscriber 
 . 
  listen 
 
  
 do 
  
 | 
 received_message 
 | 
  
 puts 
  
 "Received message: 
 #{ 
 received_message 
 . 
 data 
 } 
 " 
  
 unless 
  
 received_message 
 . 
 attributes 
 . 
 empty? 
  
 puts 
  
 "Attributes:" 
  
 received_message 
 . 
 attributes 
 . 
 each 
  
 do 
  
 | 
 key 
 , 
  
 value 
 | 
  
 puts 
  
 " 
 #{ 
 key 
 } 
 : 
 #{ 
 value 
 } 
 " 
  
 end 
  
 end 
  
 received_message 
 . 
  acknowledge! 
 
 end 
 listener 
 . 
  start 
 
 # Let the main thread sleep for 60 seconds so the thread for listening 
 # messages does not quit 
 sleep 
  
 60 
 listener 
 . 
 stop 
 . 
 wait! 
 

Handle errors using the high-level client library

The following samples show how to handle errors that arise when subscribing to messages.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
 using 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 future 
 ; 
 auto 
  
 sample 
  
 = 
  
 []( 
 pubsub 
 :: 
 Subscriber 
  
 subscriber 
 ) 
  
 { 
  
 return 
  
 subscriber 
  
 . 
 Subscribe 
 ([&]( 
 pubsub 
 :: 
 Message 
  
 const 
&  
 m 
 , 
  
 pubsub 
 :: 
 AckHandler 
  
 h 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Received message " 
 << 
 m 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 move 
 ( 
 h 
 ). 
 ack 
 (); 
  
 PleaseIgnoreThisSimplifiesTestingTheSamples 
 (); 
  
 }) 
  
 // Setup an error handler for the subscription session 
  
 . 
 then 
 ([]( 
 future<google 
 :: 
 cloud 
 :: 
 Status 
>  
 f 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Subscription session result: " 
 << 
 f 
 . 
 get 
 () 
 << 
 " 
 \n 
 " 
 ; 
  
 }); 
 }; 
 

Go

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 pullMsgsError 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // subID := "my-sub" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // If the service returns a non-retryable error, Receive returns that error after 
  
 // all of the outstanding calls to the handler have returned. 
  
 // client.Subscriber can be passed a subscription ID (e.g. "my-sub") or 
  
 // a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub"). 
  
 // If a subscription ID is provided, the project ID from the client is used. 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subID 
 ) 
  
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 ctx 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Got message: %q\n" 
 , 
  
 string 
 ( 
 msg 
 . 
 Data 
 )) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "Receive: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 return 
  
 nil 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.api.gax.core. ExecutorProvider 
 
 ; 
 import 
  
 com.google.api.gax.core. InstantiatingExecutorProvider 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.common.util.concurrent.MoreExecutors 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 SubscribeWithErrorListenerExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 subscribeWithErrorListenerExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeWithErrorListenerExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 ) 
  
 { 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 // Handle incoming message, then ack the received message. 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Id: " 
  
 + 
  
 message 
 . 
 getMessageId 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Data: " 
  
 + 
  
 message 
 . 
 getData 
 (). 
 toStringUtf8 
 ()); 
  
 consumer 
 . 
 ack 
 (); 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 // Provides an executor service for processing messages. 
  
  ExecutorProvider 
 
  
 executorProvider 
  
 = 
  
  InstantiatingExecutorProvider 
 
 . 
 newBuilder 
 (). 
  setExecutorThreadCount 
 
 ( 
 4 
 ). 
 build 
 (); 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ) 
  
 . 
 setExecutorProvider 
 ( 
 executorProvider 
 ) 
  
 . 
 build 
 (); 
  
 // Listen for unrecoverable failures. 
  
 subscriber 
 . 
 addListener 
 ( 
  
 new 
  
  Subscriber 
 
 . 
 Listener 
 () 
  
 { 
  
 public 
  
 void 
  
 failed 
 ( 
  Subscriber 
 
 . 
 State 
  
 from 
 , 
  
 Throwable 
  
 failure 
 ) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Unrecoverable subscriber failure:" 
  
 + 
  
 failure 
 . 
 getStackTrace 
 ()); 
  
 } 
  
 }, 
  
 MoreExecutors 
 . 
 directExecutor 
 ()); 
  
 // Start the subscriber. 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 // Allow the subscriber to run for 30s unless an unrecoverable error occurs. 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 timeoutException 
 ) 
  
 { 
  
 // Shut down the subscriber after 30s. Stop receiving messages. 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // const timeout = 10; 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 function 
  
 listenForErrors 
 ( 
 subscriptionNameOrId 
 , 
  
 timeout 
 ) 
  
 { 
  
 // References an existing subscription 
  
 const 
  
 subscription 
  
 = 
  
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
  
 // Create an event handler to handle messages 
  
 const 
  
 messageHandler 
  
 = 
  
 message 
  
 = 
>  
 { 
  
 // Do something with the message 
  
 console 
 . 
 log 
 ( 
 `Message: 
 ${ 
 message 
 } 
 ` 
 ); 
  
 // "Ack" (acknowledge receipt of) the message 
  
 message 
 . 
  ack 
 
 (); 
  
 }; 
  
 // Create an event handler to handle errors 
  
 const 
  
 errorHandler 
  
 = 
  
 error 
  
 = 
>  
 { 
  
 // Do something with the error 
  
 console 
 . 
 error 
 ( 
 `ERROR: 
 ${ 
 error 
 } 
 ` 
 ); 
  
 throw 
  
 error 
 ; 
  
 }; 
  
 // Listen for new messages/errors until timeout is hit 
  
 subscripti on 
 
 . 
  on 
 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 subscripti on 
 
 . 
  on 
 
 ( 
 'error' 
 , 
  
 errorHandler 
 ); 
  
 // Wait a while for the subscription to run. (Part of the sample only.) 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 subscription 
 . 
 removeListener 
 ( 
 'error' 
 , 
  
 errorHandler 
 ); 
  
 }, 
  
 timeout 
  
 * 
  
 1000 
 ); 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 # Number of seconds the subscriber should listen for messages 
 # timeout = 5.0 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 def 
  
 callback 
 ( 
 message 
 : 
 pubsub_v1 
 . 
 subscriber 
 . 
 message 
 . 
  Message 
 
 ) 
 - 
> None 
 : 
 print 
 ( 
 f 
 "Received 
 { 
 message 
 } 
 ." 
 ) 
 message 
 . 
  ack 
 
 () 
 streaming_pull_future 
 = 
  subscribe 
 
r . 
  subscribe 
 
 ( 
 subscription_path 
 , 
 callback 
 = 
 callback 
 ) 
 print 
 ( 
 f 
 "Listening for messages on 
 { 
 subscription_path 
 } 
 .. 
 \n 
 " 
 ) 
 # Wrap subscriber in a 'with' block to automatically call close() when done. 
 with 
 subscriber 
 : 
 # When `timeout` is not set, result() will block indefinitely, 
 # unless an exception is encountered first. 
 try 
 : 
 streaming_pull_future 
 . 
 result 
 ( 
 timeout 
 = 
 timeout 
 ) 
 except 
 Exception 
 as 
 e 
 : 
 print 
 ( 
 f 
 "Listening for messages on 
 { 
 subscription_path 
 } 
 threw an exception: 
 { 
 e 
 } 
 ." 
 ) 
 streaming_pull_future 
 . 
 cancel 
 () 
 # Trigger the shutdown. 
 streaming_pull_future 
 . 
 result 
 () 
 # Block until the shutdown is complete. 
 

Ruby

The following sample uses the major version of the Go Pub/Sub client library (v2). If you are still using the v1 library, see the migration guide to v2 . To see a list of v1 code samples, see the deprecated code samples .

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Go API reference documentation .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 listener 
  
 = 
  
 subscriber 
 . 
  listen 
 
  
 do 
  
 | 
 received_message 
 | 
  
 puts 
  
 "Received message: 
 #{ 
 received_message 
 . 
 data 
 } 
 " 
  
 received_message 
 . 
  acknowledge! 
 
 end 
 # Propagate exception from child threads to the main thread as soon as it is 
 # raised. Exceptions happened in the callback thread are collected in the 
 # callback thread pool and do not propagate to the main thread 
 Thread 
 . 
 abort_on_exception 
  
 = 
  
 true 
 begin 
  
 listener 
 . 
  start 
 
  
 # Let the main thread sleep for 60 seconds so the thread for listening 
  
 # messages does not quit 
  
 sleep 
  
 60 
  
 listener 
 . 
 stop 
 . 
 wait! 
 rescue 
  
 StandardError 
  
 = 
>  
 e 
  
 puts 
  
 "Exception 
 #{ 
 e 
 . 
 inspect 
 } 
 : 
 #{ 
 e 
 . 
 message 
 } 
 " 
  
 raise 
  
 "Stopped listening for messages." 
 end 
 

Unary pull

To use the Unary API, you can use the Google Cloud console, Google Cloud CLI, or a client library.

Considerations

Pub/Sub delivers a list of messages. If the list has multiple messages, Pub/Sub orders the messages with the same ordering key . The following are some important caveats:

  • Setting a value for max_messages in the request does not guarantee that max_messages are returned, even if there are that many messages in the backlog. The Pub/Sub Pull API might return fewer than max_messages in order to reduce the delivery latency for messages that are readily available to be delivered.

  • A pull response that comes with 0 messages must not be used as an indicator that there are no messages in the backlog. It's possible to get a response with 0 messages and have a subsequent request that returns messages.

Console

  1. In the Google Cloud console, go to the Pub/Sub subscriptionspage.

    Go to Subscriptions

  2. Click the name of the subscription that you want to pull from.

  3. In the Messagestab, click Pull.

You should see the messages that you published to this subscription and the time they were published.

When using the Google Cloud console, an individual pull for a low message volume can often return zero messages. If you don't see messages, click Pullmultiple times to issue multiple pull requests. This is not an issue with the Pub/Sub Client Libraries .

gcloud

To pull a message from a subscription, run the gcloud pubsub subscriptions pull command. The gcloud CLI prints the message to the command line.

gcloud  
pubsub  
subscriptions  
pull  
 SUBSCRIPTION_NAME 
  
--auto-ack

Replace:

  • SUBSCRIPTION_NAME : The name of the subscription from which you want to pull messages.

Client Libraries

Here's some sample code to pull and acknowledge a fixed number of messages.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C++ API reference documentation .

  []( 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 :: 
 Subscriber 
  
 subscriber 
 ) 
  
 { 
  
 auto 
  
 response 
  
 = 
  
 subscriber 
 . 
 Pull 
 (); 
  
 if 
  
 ( 
 ! 
 response 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 response 
 ). 
 status 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Received message " 
 << 
 response 
 - 
> message 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 move 
 ( 
 response 
 - 
> handler 
 ). 
 ack 
 (); 
 } 
 

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub C# API reference documentation .

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
  Grpc.Core 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Linq 
 ; 
 using 
  
 System.Threading 
 ; 
 public 
  
 class 
  
 PullMessagesSyncSample 
 { 
  
 public 
  
 int 
  
 PullMessagesSync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  SubscriberServiceApiClient 
 
  
 subscriberClient 
  
 = 
  
  SubscriberServiceApiClient 
 
 . 
  Create 
 
 (); 
  
 int 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 try 
  
 { 
  
 // Pull messages from server, 
  
 // allowing an immediate response if there are no messages. 
  
  PullResponse 
 
  
 response 
  
 = 
  
 subscriberClient 
 . 
  Pull 
 
 ( 
 subscriptionName 
 , 
  
 maxMessages 
 : 
  
 20 
 ); 
  
 // Print out each received message. 
  
 foreach 
  
 ( 
  ReceivedMessage 
 
  
 msg 
  
 in 
  
 response 
 . 
  ReceivedMessages 
 
 ) 
  
 { 
  
 string 
  
 text 
  
 = 
  
 msg 
 . 
 Message 
 . 
  Data 
 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {msg.Message.MessageId}: {text}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 messageCount 
 ); 
  
 } 
  
 // If acknowledgement required, send to server. 
  
 if 
  
 ( 
 acknowledge 
 && 
 messageCount 
 > 
 0 
 ) 
  
 { 
  
 subscriberClient 
 . 
  Acknowledge 
 
 ( 
 subscriptionName 
 , 
  
 response 
 . 
  ReceivedMessages 
 
 . 
 Select 
 ( 
 msg 
  
 = 
>  
 msg 
 . 
  AckId 
 
 )); 
  
 } 
  
 } 
  
 catch 
  
 ( 
  RpcException 
 
  
 ex 
 ) 
  
 when 
  
 ( 
 ex 
 . 
  Status 
 
 . 
  StatusCode 
 
  
 == 
  
  StatusCode 
 
 . 
  Unavailable 
 
 ) 
  
 { 
  
 // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription. 
  
 } 
  
 return 
  
 messageCount 
 ; 
  
 } 
 } 
 

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Java API reference documentation .

  import 
  
 com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub 
 ; 
 import 
  
 com.google.cloud.pubsub.v1.stub. SubscriberStub 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1.stub.SubscriberStubSettings 
 ; 
 import 
  
 com.google.pubsub.v1. AcknowledgeRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PullRequest 
 
 ; 
 import 
  
 com.google.pubsub.v1. PullResponse 
 
 ; 
 import 
  
 com.google.pubsub.v1. ReceivedMessage 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.ArrayList 
 ; 
 import 
  
 java.util.List 
 ; 
 public 
  
 class 
 SubscribeSyncExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 Integer 
  
 numOfMessages 
  
 = 
  
 10 
 ; 
  
 subscribeSyncExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 , 
  
 numOfMessages 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeSyncExample 
 ( 
  
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 , 
  
 Integer 
  
 numOfMessages 
 ) 
  
 throws 
  
 IOException 
  
 { 
  
 SubscriberStubSettings 
  
 subscriberStubSettings 
  
 = 
  
 SubscriberStubSettings 
 . 
 newBuilder 
 () 
  
 . 
 setTransportChannelProvider 
 ( 
  
 SubscriberStubSettings 
 . 
 defaultGrpcTransportProviderBuilder 
 () 
  
 . 
 setMaxInboundMessageSize 
 ( 
 20 
  
 * 
  
 1024 
  
 * 
  
 1024 
 ) 
  
 // 20MB (maximum message size). 
  
 . 
 build 
 ()) 
  
 . 
 build 
 (); 
  
 try 
  
 ( 
  SubscriberStub 
 
  
 subscriber 
  
 = 
  
 GrpcSubscriberStub 
 . 
 create 
 ( 
 subscriberStubSettings 
 )) 
  
 { 
  
 String 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 format 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  PullRequest 
 
  
 pullRequest 
  
 = 
  
  PullRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setMaxMessages 
 ( 
 numOfMessages 
 ) 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 build 
 (); 
  
 // Use pullCallable().futureCall to asynchronously perform this operation. 
  
  PullResponse 
 
  
 pullResponse 
  
 = 
  
 subscriber 
 . 
 pullCallable 
 (). 
 call 
 ( 
 pullRequest 
 ); 
  
 // Stop the program if the pull response is empty to avoid acknowledging 
  
 // an empty list of ack IDs. 
  
 if 
  
 ( 
 pullResponse 
 . 
  getReceivedMessagesList 
 
 (). 
 isEmpty 
 ()) 
  
 { 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "No message was pulled. Exiting." 
 ); 
  
 return 
 ; 
  
 } 
  
 List<String> 
  
 ackIds 
  
 = 
  
 new 
  
 ArrayList 
<> (); 
  
 for 
  
 ( 
  ReceivedMessage 
 
  
 message 
  
 : 
  
 pullResponse 
 . 
  getReceivedMessagesList 
 
 ()) 
  
 { 
  
 // Handle received message 
  
 // ... 
  
 ackIds 
 . 
 add 
 ( 
 message 
 . 
 getAckId 
 ()); 
  
 } 
  
 // Acknowledge received messages. 
  
  AcknowledgeRequest 
 
  
 acknowledgeRequest 
  
 = 
  
  AcknowledgeRequest 
 
 . 
 newBuilder 
 () 
  
 . 
 setSubscription 
 ( 
 subscriptionName 
 ) 
  
 . 
 addAllAckIds 
 ( 
 ackIds 
 ) 
  
 . 
 build 
 (); 
  
 // Use acknowledgeCallable().futureCall to asynchronously perform this operation. 
  
 subscriber 
 . 
 acknowledgeCallable 
 (). 
 call 
 ( 
 acknowledgeRequest 
 ); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 pullResponse 
 . 
  getReceivedMessagesList 
 
 ()); 
  
 } 
  
 } 
 } 
 

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const projectId = 'YOUR_PROJECT_ID'; 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // Imports the Google Cloud client library. v1 is for the lower level 
 // proto access. 
 const 
  
 { 
 v1 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use. 
 const 
  
 subClient 
  
 = 
  
 new 
  
 v1 
 . 
  SubscriberClient 
 
 (); 
 async 
  
 function 
  
 synchronousPull 
 ( 
 projectId 
 , 
  
 subscriptionNameOrId 
 ) 
  
 { 
  
 // The low level API client requires a name only. 
  
 const 
  
 formattedSubscription 
  
 = 
  
 subscriptionNameOrId 
 . 
 indexOf 
 ( 
 '/' 
 ) 
  
> = 
  
 0 
  
 ? 
  
 subscriptionNameOrId 
  
 : 
  
 subClient 
 . 
 subscriptionPath 
 ( 
 projectId 
 , 
  
 subscriptionNameOrId 
 ); 
  
 // The maximum number of messages returned for this request. 
  
 // Pub/Sub may return fewer than the number specified. 
  
 const 
  
 request 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 maxMessages 
 : 
  
 10 
 , 
  
 }; 
  
 // The subscriber pulls a specified number of messages. 
  
 const 
  
 [ 
 response 
 ] 
  
 = 
  
 await 
  
 subClient 
 . 
 pull 
 ( 
 request 
 ); 
  
 // Process the messages. 
  
 const 
  
 ackIds 
  
 = 
  
 []; 
  
 for 
  
 ( 
 const 
  
 message 
  
 of 
  
 response 
 . 
 receivedMessages 
  
 || 
  
 []) 
  
 { 
  
 console 
 . 
 log 
 ( 
 `Received message: 
 ${ 
 message 
 . 
 message 
 . 
 data 
 } 
 ` 
 ); 
  
 if 
  
 ( 
 message 
 . 
 ackId 
 ) 
  
 { 
  
 ackIds 
 . 
 push 
 ( 
 message 
 . 
 ackId 
 ); 
  
 } 
  
 } 
  
 if 
  
 ( 
 ackIds 
 . 
  length 
 
  
 !== 
  
 0 
 ) 
  
 { 
  
 // Acknowledge all of the messages. You could also acknowledge 
  
 // these individually, but this is more efficient. 
  
 const 
  
 ackRequest 
  
 = 
  
 { 
  
 subscription 
 : 
  
 formattedSubscription 
 , 
  
 ackIds 
 : 
  
 ackIds 
 , 
  
 }; 
  
 await 
  
 subClient 
 . 
 acknowledge 
 ( 
 ackRequest 
 ); 
  
 } 
  
 console 
 . 
 log 
 ( 
 'Done.' 
 ); 
 } 
 

PHP

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Node.js API reference documentation .

  use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Pulls all Pub/Sub messages for a subscription. 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $subscriptionName  The Pub/Sub subscription name. 
 */ 
 function pull_messages($projectId, $subscriptionName) 
 { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $subscription = $pubsub->subscription($subscriptionName); 
 foreach ($subscription->pull() as $message) { 
 printf('Message: %s' . PHP_EOL, $message->data()); 
 // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times. 
 $subscription->acknowledge($message); 
 } 
 } 
 

Ruby

The following sample uses Ruby Pub/Sub client library v3. If you are still using the v2 library, see the migration guide to v3 . To see a list of Ruby v2 code samples, see the deprecated code samples .

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Ruby API reference documentation .

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 subscriber 
 . 
 pull 
 ( 
 immediate 
 : 
  
 false 
 ) 
 . 
 each 
  
 do 
  
 | 
 message 
 | 
  
 puts 
  
 "Message pulled: 
 #{ 
 message 
 . 
 data 
 } 
 " 
  
 message 
 . 
  acknowledge! 
 
 end 
 

Protocol

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

  { 
  
 "returnImmediately" 
 : 
  
 "false" 
 , 
  
 "maxMessages" 
 : 
  
 "1" 
 } 
 

Response:

200 OK

  { 
  
 "receivedMessages" 
 : 
  
 [{ 
  
 "ackId" 
 : 
  
 "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..." 
 , 
  
 "message" 
 : 
  
 { 
  
 "data" 
 : 
  
 "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==" 
 , 
  
 "messageId" 
 : 
  
 "19917247034" 
  
 } 
  
 }] 
 } 
 

Request:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

  { 
  
 "ackIds" 
 : 
  
 [ 
  
 "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..." 
  
 ] 
 } 
 

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries . For more information, see the Pub/Sub Python API reference documentation .

  from 
  
 google.api_core 
  
 import 
  retry 
 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 NUM_MESSAGES 
 = 
 3 
 # Wrap the subscriber in a 'with' block to automatically call close() to 
 # close the underlying gRPC channel when done. 
 with 
 subscriber 
 : 
 # The subscriber pulls a specific number of messages. The actual 
 # number of messages pulled may be smaller than max_messages. 
 response 
 = 
 subscriber 
 . 
  pull 
 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "max_messages" 
 : 
 NUM_MESSAGES 
 }, 
 retry 
 = 
  retry 
 
 . 
 Retry 
 ( 
 deadline 
 = 
 300 
 ), 
 ) 
 if 
 len 
 ( 
 response 
 . 
 received_messages 
 ) 
 == 
 0 
 : 
 return 
 ack_ids 
 = 
 [] 
 for 
 received_message 
 in 
 response 
 . 
 received_messages 
 : 
 print 
 ( 
 f 
 "Received: 
 { 
 received_message 
 . 
 message 
 . 
  data 
 
 } 
 ." 
 ) 
 ack_ids 
 . 
 append 
 ( 
 received_message 
 . 
  ack_id 
 
 ) 
 # Acknowledges the received messages so they will not be sent again. 
 subscriber 
 . 
  acknowledge 
 
 ( 
 request 
 = 
 { 
 "subscription" 
 : 
 subscription_path 
 , 
 "ack_ids" 
 : 
 ack_ids 
 } 
 ) 
 print 
 ( 
 f 
 "Received and acknowledged 
 { 
 len 
 ( 
 response 
 . 
 received_messages 
 ) 
 } 
 messages from 
 { 
 subscription_path 
 } 
 ." 
 ) 
 

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: