Publish and receive messages in Pub/Sub by using a client library

The Pub/Sub service allows applications to exchange messages reliably, quickly, and asynchronously. The following is the sequence of events:

  1. A producer of data publishes a message to a Pub/Sub topic.
  2. A subscriber client creates a subscription to that topic and consumes messages from the subscription.

You can set up a Pub/Sub environment by using any of the following methods: Google Cloud console, Cloud Shell, client libraries, or REST APIs. This page shows you how to get started publishing messages with Pub/Sub using client libraries.

Pub/Sub offers a high-level and a low-level auto-generated client library. By default, as in this quickstart, we recommend the high-level client library.


To follow step-by-step guidance for this task directly in the Google Cloud console, click Guide me :

Guide me


Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  4. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  5. Create or select a Google Cloud project .

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID 
      

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID 
      

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project .

  7. Enable the Pub/Sub API:

    gcloud  
    services  
     enable 
      
    pubsub.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud  
    auth  
    application-default  
    login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity .

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
      
    --member = 
     "user: USER_IDENTIFIER 
    " 
      
    --role = 
     ROLE 
    

    Replace the following:

    • PROJECT_ID : your project ID.
    • USER_IDENTIFIER : the identifier for your user account—for example, myemail@example.com .
    • ROLE : the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity .

  12. To initialize the gcloud CLI, run the following command:

    gcloud  
    init
  13. Create or select a Google Cloud project .

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID 
      

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID 
      

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project .

  15. Enable the Pub/Sub API:

    gcloud  
    services  
     enable 
      
    pubsub.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud  
    auth  
    application-default  
    login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity .

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud  
    projects  
    add-iam-policy-binding  
     PROJECT_ID 
      
    --member = 
     "user: USER_IDENTIFIER 
    " 
      
    --role = 
     ROLE 
    

    Replace the following:

    • PROJECT_ID : your project ID.
    • USER_IDENTIFIER : the identifier for your user account—for example, myemail@example.com .
    • ROLE : the IAM role that you grant to your user account.

Install the client libraries

The following samples show you how to install the client libraries:

Python

For more on setting up your Python development environment, refer to Python Development Environment Setup Guide .

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

C++

For more information about installing the C++ library, see the GitHub README

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get cloud.google.com/go/pubsub

Java

If you are using Maven , add the following to your pom.xml file. For more information about BOMs, see The Google Cloud Platform Libraries BOM .

 <dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>26.66.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
  </dependency>

</dependencies> 

If you are using Gradle , add the following to your dependencies:

 implementation platform('com.google.cloud:libraries-bom:26.66.0')

implementation 'com.google.cloud:google-cloud-pubsub' 

If you are using sbt , add the following to your dependencies:

 libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.141.3" 

If you're using Visual Studio Code, IntelliJ, or Eclipse, you can add client libraries to your project using the following IDE plugins:

The plugins provide additional functionality, such as key management for service accounts. Refer to each plugin's documentation for details.

Node.js

npm install @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Ruby

gem install google-cloud-pubsub

Create a topic and a subscription

After you create a topic, you can subscribe or publish to it.

Use the following gcloud pubsub topics create command to create a topic named my-topic . Don't change the name of the topic, because it's referenced throughout the rest of the tutorial.

gcloud  
pubsub  
topics  
create  
my-topic

Use the gcloud pubsub subscriptions create command to create a subscription. Only messages published to the topic after the subscription is created are available to subscriber applications.

gcloud  
pubsub  
subscriptions  
create  
my-sub  
--topic  
my-topic

Publish messages

Before running the following samples, make sure you uncomment and fill in any of the required values that are marked in the code. This is required to link the sample to your project and Pub/Sub resources that you created earlier.

Use my-topic for your topic ID.

Python

  from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # topic_id = "your-topic-id" 
 publisher 
 = 
 pubsub_v1 
 . 
  PublisherClient 
 
 () 
 # The `topic_path` method creates a fully qualified identifier 
 # in the form `projects/{project_id}/topics/{topic_id}` 
 topic_path 
 = 
 publisher 
 . 
 topic_path 
 ( 
 project_id 
 , 
 topic_id 
 ) 
 for 
 n 
 in 
 range 
 ( 
 1 
 , 
 10 
 ): 
 data_str 
 = 
 f 
 "Message number 
 { 
 n 
 } 
 " 
 # Data must be a bytestring 
 data 
 = 
 data_str 
 . 
 encode 
 ( 
 "utf-8" 
 ) 
 # When you publish a message, the client returns a future. 
 future 
 = 
  publish 
 
er . 
  publish 
 
 ( 
 topic_path 
 , 
 data 
 ) 
 print 
 ( 
 future 
 . 
 result 
 ()) 
 print 
 ( 
 f 
 "Published messages to 
 { 
 topic_path 
 } 
 ." 
 ) 
 

C++

  #include 
  
 "google/cloud/pubsub/publisher.h" 
 #include <iostream> 
 int 
  
 main 
 ( 
 int 
  
 argc 
 , 
  
 char 
 * 
  
 argv 
 []) 
  
 try 
  
 { 
  
 if 
  
 ( 
 argc 
  
 != 
  
 3 
 ) 
  
 { 
  
 std 
 :: 
 cerr 
 << 
 "Usage: " 
 << 
 argv 
 [ 
 0 
 ] 
 << 
 " <project-id> <topic-id> 
 \n 
 " 
 ; 
  
 return 
  
 1 
 ; 
  
 } 
  
 std 
 :: 
 string 
  
 const 
  
 project_id 
  
 = 
  
 argv 
 [ 
 1 
 ]; 
  
 std 
 :: 
 string 
  
 const 
  
 topic_id 
  
 = 
  
 argv 
 [ 
 2 
 ]; 
  
 // Create a namespace alias to make the code easier to read. 
  
 namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
  
 auto 
  
 publisher 
  
 = 
  
 pubsub 
 :: 
 Publisher 
 ( 
  
 pubsub 
 :: 
 MakePublisherConnection 
 ( 
 pubsub 
 :: 
 Topic 
 ( 
 project_id 
 , 
  
 topic_id 
 ))); 
  
 auto 
  
 id 
  
 = 
  
 publisher 
  
 . 
 Publish 
 ( 
 pubsub 
 :: 
 MessageBuilder 
 {}. 
 SetData 
 ( 
 "Hello World!" 
 ). 
 Build 
 ()) 
  
 . 
 get 
 (); 
  
 if 
  
 ( 
 ! 
 id 
 ) 
  
 throw 
  
 std 
 :: 
 move 
 ( 
 id 
 ). 
 status 
 (); 
  
 std 
 :: 
 cout 
 << 
 "Hello World published with id=" 
 << 
 * 
 id 
 << 
 " 
 \n 
 " 
 ; 
  
 return 
  
 0 
 ; 
 } 
  
 catch 
  
 ( 
 google 
 :: 
 cloud 
 :: 
 Status 
  
 const 
&  
 status 
 ) 
  
 { 
  
 std 
 :: 
 cerr 
 << 
 "google::cloud::Status thrown: " 
 << 
 status 
 << 
 " 
 \n 
 " 
 ; 
  
 return 
  
 1 
 ; 
 } 
 

C#

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Collections.Generic 
 ; 
 using 
  
 System.Linq 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PublishMessagesAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PublishMessagesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 topicId 
 , 
  
 IEnumerable<string> 
  
 messageTexts 
 ) 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
  FromProjectTopic 
 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  PublisherClient 
 
  
 publisher 
  
 = 
  
 await 
  
  PublisherClient 
 
 . 
  CreateAsync 
 
 ( 
 topicName 
 ); 
  
 int 
  
 publishedMessageCount 
  
 = 
  
 0 
 ; 
  
 var 
  
 publishTasks 
  
 = 
  
 messageTexts 
 . 
 Select 
 ( 
 async 
  
 text 
  
 = 
>  
 { 
  
 try 
  
 { 
  
 string 
  
 message 
  
 = 
  
 await 
  
 publisher 
 . 
 PublishAsync 
 ( 
 text 
 ); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Published message {message}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 publishedMessageCount 
 ); 
  
 } 
  
 catch 
  
 ( 
 Exception 
  
 exception 
 ) 
  
 { 
  
 Console 
 . 
 WriteLine 
 ( 
 $"An error occurred when publishing message {text}: {exception.Message}" 
 ); 
  
 } 
  
 }); 
  
 await 
  
 Task 
 . 
 WhenAll 
 ( 
 publishTasks 
 ); 
  
 return 
  
 publishedMessageCount 
 ; 
  
 } 
 } 
 

Go

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 publish 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 topicID 
 , 
  
 msg 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // topicID := "my-topic" 
  
 // msg := "Hello World" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub: NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Publisher can be passed a topic ID (e.g. "my-topic") or 
  
 // a fully qualified name (e.g. "projects/my-project/topics/my-topic"). 
  
 // If a topic ID is provided, the project ID from the client is used. 
  
 // Reuse this publisher for all publish calls to send messages in batches. 
  
 publisher 
  
 := 
  
 client 
 . 
 Publisher 
 ( 
 topicID 
 ) 
  
 result 
  
 := 
  
 publisher 
 . 
 Publish 
 ( 
 ctx 
 , 
  
& pubsub 
 . 
 Message 
 { 
  
 Data 
 : 
  
 [] 
 byte 
 ( 
 msg 
 ), 
  
 }) 
  
 // Block until the result is returned and a server-generated 
  
 // ID is returned for the published message. 
  
 id 
 , 
  
 err 
  
 := 
  
 result 
 . 
 Get 
 ( 
 ctx 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub: result.Get: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Published a message; msg ID: %v\n" 
 , 
  
 id 
 ) 
  
 return 
  
 nil 
 } 
 

Java

  import 
  
 com.google.api.core. ApiFuture 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Publisher 
 
 ; 
 import 
  
 com.google.protobuf. ByteString 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 com.google.pubsub.v1. TopicName 
 
 ; 
 import 
  
 java.io.IOException 
 ; 
 import 
  
 java.util.concurrent.ExecutionException 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 public 
  
 class 
 PublisherExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 topicId 
  
 = 
  
 "your-topic-id" 
 ; 
  
 publisherExample 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 publisherExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 topicId 
 ) 
  
 throws 
  
 IOException 
 , 
  
 ExecutionException 
 , 
  
 InterruptedException 
  
 { 
  
  TopicName 
 
  
 topicName 
  
 = 
  
  TopicName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 topicId 
 ); 
  
  Publisher 
 
  
 publisher 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 // Create a publisher instance with default settings bound to the topic 
  
 publisher 
  
 = 
  
  Publisher 
 
 . 
 newBuilder 
 ( 
 topicName 
 ). 
 build 
 (); 
  
 String 
  
 message 
  
 = 
  
 "Hello World!" 
 ; 
  
  ByteString 
 
  
 data 
  
 = 
  
  ByteString 
 
 . 
  copyFromUtf8 
 
 ( 
 message 
 ); 
  
  PubsubMessage 
 
  
 pubsubMessage 
  
 = 
  
  PubsubMessage 
 
 . 
 newBuilder 
 (). 
  setData 
 
 ( 
 data 
 ). 
 build 
 (); 
  
 // Once published, returns a server-assigned message id (unique within the topic) 
  
 ApiFuture<String> 
  
 messageIdFuture 
  
 = 
  
  publish 
 
er . 
  publish 
 
 ( 
 pubsubMessage 
 ); 
  
 String 
  
 messageId 
  
 = 
  
 messageIdFuture 
 . 
 get 
 (); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Published message ID: " 
  
 + 
  
 messageId 
 ); 
  
 } 
  
 finally 
  
 { 
  
 if 
  
 ( 
 publisher 
  
 != 
  
 null 
 ) 
  
 { 
  
 // When finished with the publisher, shutdown to free up resources. 
  
 publisher 
 . 
  shutdown 
 
 (); 
  
 publisher 
 . 
  awaitTermination 
 
 ( 
 1 
 , 
  
 TimeUnit 
 . 
 MINUTES 
 ); 
  
 } 
  
 } 
  
 } 
 } 
 

Node.js

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; 
 // const data = JSON.stringify({foo: 'bar'}); 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 async 
  
 function 
  
 publishMessage 
 ( 
 topicNameOrId 
 , 
  
 data 
 ) 
  
 { 
  
 // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) 
  
 const 
  
 dataBuffer 
  
 = 
  
 Buffer 
 . 
  from 
 
 ( 
 data 
 ); 
  
 // Cache topic objects (publishers) and reuse them. 
  
 const 
  
 topic 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 ); 
  
 try 
  
 { 
  
 const 
  
 messageId 
  
 = 
  
 await 
  
 topic 
 . 
  publishMessage 
 
 ({ 
 data 
 : 
  
 dataBuffer 
 }); 
  
 console 
 . 
 log 
 ( 
 `Message 
 ${ 
 messageId 
 } 
 published.` 
 ); 
  
 } 
  
 catch 
  
 ( 
 error 
 ) 
  
 { 
  
 console 
 . 
 error 
 ( 
 `Received error while publishing: 
 ${ 
 error 
 . 
 message 
 } 
 ` 
 ); 
  
 process 
 . 
 exitCode 
  
 = 
  
 1 
 ; 
  
 } 
 } 
 

Node.js

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; 
 // const data = JSON.stringify({foo: 'bar'}); 
 // Imports the Google Cloud client library 
 import 
  
 { 
 PubSub 
 } 
  
 from 
  
 '@google-cloud/pubsub' 
 ; 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
 PubSub 
 (); 
 async 
  
 function 
  
 publishMessage 
 ( 
 topicNameOrId 
 : 
  
 string 
 , 
  
 data 
 : 
  
 string 
 ) 
  
 { 
  
 // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) 
  
 const 
  
 dataBuffer 
  
 = 
  
 Buffer 
 . 
 from 
 ( 
 data 
 ); 
  
 // Cache topic objects (publishers) and reuse them. 
  
 const 
  
 topic 
  
 = 
  
 pubSubClient 
 . 
 topic 
 ( 
 topicNameOrId 
 ); 
  
 try 
  
 { 
  
 const 
  
 messageId 
  
 = 
  
 await 
  
 topic 
 . 
 publishMessage 
 ({ 
 data 
 : 
  
 dataBuffer 
 }); 
  
 console 
 . 
 log 
 ( 
 `Message 
 ${ 
 messageId 
 } 
 published.` 
 ); 
  
 } 
  
 catch 
  
 ( 
 error 
 ) 
  
 { 
  
 console 
 . 
 error 
 ( 
  
 `Received error while publishing: 
 ${ 
 ( 
 error 
  
 as 
  
 Error 
 ). 
 message 
 } 
 ` 
 , 
  
 ); 
  
 process 
 . 
 exitCode 
  
 = 
  
 1 
 ; 
  
 } 
 } 
 

PHP

  use Google\Cloud\PubSub\MessageBuilder; 
 use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Publishes a message for a Pub/Sub topic. 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $topicName  The Pub/Sub topic name. 
 * @param string $message  The message to publish. 
 */ 
 function publish_message($projectId, $topicName, $message) 
 { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $topic = $pubsub->topic($topicName); 
 $topic->publish((new MessageBuilder)->setData($message)->build()); 
 print('Message published' . PHP_EOL); 
 } 
 

Ruby

  # topic_id = "your-topic-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 publisher 
  
 = 
  
 pubsub 
 . 
  publisher 
 
  
 topic_id 
 publisher 
 . 
 publish 
  
 "This is a test message." 
 puts 
  
 "Message published." 
 

Receive messages

Set up a subscriber to pull the messages you just published. Every subscriber must acknowledge each message within a configurable time window. Unacknowledged messages are redelivered. Note that Pub/Sub occasionally delivers a message more than once to ensure that all messages make it to a subscriber at least once.

Before running the following samples, make sure you uncomment and fill in any of the required values that are marked in the code. This is required to link the sample to your project and Pub/Sub resources that you created earlier

Use my-sub for your subscription ID.

For more examples that show how to pull messages, see Client Library code samples .

Python

  from 
  
 concurrent.futures 
  
 import 
 TimeoutError 
 from 
  
 google.cloud 
  
 import 
 pubsub_v1 
 # TODO(developer) 
 # project_id = "your-project-id" 
 # subscription_id = "your-subscription-id" 
 # Number of seconds the subscriber should listen for messages 
 # timeout = 5.0 
 subscriber 
 = 
 pubsub_v1 
 . 
  SubscriberClient 
 
 () 
 # The `subscription_path` method creates a fully qualified identifier 
 # in the form `projects/{project_id}/subscriptions/{subscription_id}` 
 subscription_path 
 = 
 subscriber 
 . 
 subscription_path 
 ( 
 project_id 
 , 
 subscription_id 
 ) 
 def 
  
 callback 
 ( 
 message 
 : 
 pubsub_v1 
 . 
 subscriber 
 . 
 message 
 . 
  Message 
 
 ) 
 - 
> None 
 : 
 print 
 ( 
 f 
 "Received 
 { 
 message 
 } 
 ." 
 ) 
 message 
 . 
  ack 
 
 () 
 streaming_pull_future 
 = 
  subscribe 
 
r . 
  subscribe 
 
 ( 
 subscription_path 
 , 
 callback 
 = 
 callback 
 ) 
 print 
 ( 
 f 
 "Listening for messages on 
 { 
 subscription_path 
 } 
 .. 
 \n 
 " 
 ) 
 # Wrap subscriber in a 'with' block to automatically call close() when done. 
 with 
 subscriber 
 : 
 try 
 : 
 # When `timeout` is not set, result() will block indefinitely, 
 # unless an exception is encountered first. 
 streaming_pull_future 
 . 
 result 
 ( 
 timeout 
 = 
 timeout 
 ) 
 except 
 TimeoutError 
 : 
 streaming_pull_future 
 . 
 cancel 
 () 
 # Trigger the shutdown. 
 streaming_pull_future 
 . 
 result 
 () 
 # Block until the shutdown is complete. 
 

C++

  #include 
  
 "google/cloud/pubsub/message.h" 
 #include 
  
 "google/cloud/pubsub/subscriber.h" 
 #include <iostream> 
 int 
  
 main 
 ( 
 int 
  
 argc 
 , 
  
 char 
 * 
  
 argv 
 []) 
  
 try 
  
 { 
  
 if 
  
 ( 
 argc 
  
 != 
  
 3 
 ) 
  
 { 
  
 std 
 :: 
 cerr 
 << 
 "Usage: " 
 << 
 argv 
 [ 
 0 
 ] 
 << 
 " <project-id> <subscription-id> 
 \n 
 " 
 ; 
  
 return 
  
 1 
 ; 
  
 } 
  
 std 
 :: 
 string 
  
 const 
  
 project_id 
  
 = 
  
 argv 
 [ 
 1 
 ]; 
  
 std 
 :: 
 string 
  
 const 
  
 subscription_id 
  
 = 
  
 argv 
 [ 
 2 
 ]; 
  
 auto 
  
 constexpr 
  
 kWaitTimeout 
  
 = 
  
 std 
 :: 
 chrono 
 :: 
 seconds 
 ( 
 30 
 ); 
  
 // Create a namespace alias to make the code easier to read. 
  
 namespace 
  
 pubsub 
  
 = 
  
 :: 
 google 
 :: 
 cloud 
 :: 
 pubsub 
 ; 
  
 auto 
  
 subscriber 
  
 = 
  
 pubsub 
 :: 
 Subscriber 
 ( 
 pubsub 
 :: 
 MakeSubscriberConnection 
 ( 
  
 pubsub 
 :: 
 Subscription 
 ( 
 project_id 
 , 
  
 subscription_id 
 ))); 
  
 auto 
  
 session 
  
 = 
  
 subscriber 
 . 
 Subscribe 
 ([&]( 
 pubsub 
 :: 
 Message 
  
 const 
&  
 m 
 , 
  
 pubsub 
 :: 
 AckHandler 
  
 h 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "Received message " 
 << 
 m 
 << 
 " 
 \n 
 " 
 ; 
  
 std 
 :: 
 move 
 ( 
 h 
 ). 
 ack 
 (); 
  
 }); 
  
 std 
 :: 
 cout 
 << 
 "Waiting for messages on " 
  
 + 
  
 subscription_id 
  
 + 
  
 "... 
 \n 
 " 
 ; 
  
 // Blocks until the timeout is reached. 
  
 auto 
  
 result 
  
 = 
  
 session 
 . 
 wait_for 
 ( 
 kWaitTimeout 
 ); 
  
 if 
  
 ( 
 result 
  
 == 
  
 std 
 :: 
 future_status 
 :: 
 timeout 
 ) 
  
 { 
  
 std 
 :: 
 cout 
 << 
 "timeout reached, ending session 
 \n 
 " 
 ; 
  
 session 
 . 
 cancel 
 (); 
  
 } 
  
 return 
  
 0 
 ; 
 } 
  
 catch 
  
 ( 
 google 
 :: 
 cloud 
 :: 
 Status 
  
 const 
&  
 status 
 ) 
  
 { 
  
 std 
 :: 
 cerr 
 << 
 "google::cloud::Status thrown: " 
 << 
 status 
 << 
 " 
 \n 
 " 
 ; 
  
 return 
  
 1 
 ; 
 } 
 

C#

  using 
  
  Google.Cloud.PubSub.V1 
 
 ; 
 using 
  
 System 
 ; 
 using 
  
 System.Threading 
 ; 
 using 
  
 System.Threading.Tasks 
 ; 
 public 
  
 class 
  
 PullMessagesAsyncSample 
 { 
  
 public 
  
 async 
  
 Task<int> 
  
 PullMessagesAsync 
 ( 
 string 
  
 projectId 
 , 
  
 string 
  
 subscriptionId 
 , 
  
 bool 
  
 acknowledge 
 ) 
  
 { 
  
  SubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  SubscriptionName 
 
 . 
  FromProjectSubscription 
 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
  SubscriberClient 
 
  
 subscriber 
  
 = 
  
 await 
  
  SubscriberClient 
 
 . 
  CreateAsync 
 
 ( 
 subscriptionName 
 ); 
  
 // SubscriberClient runs your message handle function on multiple 
  
 // threads to maximize throughput. 
  
 int 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 Task 
  
 startTask 
  
 = 
  
 subscriber 
 . 
  StartAsync 
 
 (( 
  PubsubMessage 
 
  
 message 
 , 
  
 CancellationToken 
  
 cancel 
 ) 
  
 = 
>  
 { 
  
 string 
  
 text 
  
 = 
  
 message 
 . 
 Data 
 . 
 ToStringUtf8 
 (); 
  
 Console 
 . 
 WriteLine 
 ( 
 $"Message {message.MessageId}: {text}" 
 ); 
  
 Interlocked 
 . 
 Increment 
 ( 
 ref 
  
 messageCount 
 ); 
  
 return 
  
 Task 
 . 
 FromResult 
 ( 
 acknowledge 
  
 ? 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Ack 
 
  
 : 
  
  SubscriberClient 
 
 . 
  Reply 
 
 . 
  Nack 
 
 ); 
  
 }); 
  
 // Run for 5 seconds. 
  
 await 
  
 Task 
 . 
 Delay 
 ( 
 5000 
 ); 
  
 await 
  
 subscriber 
 . 
  StopAsync 
 
 ( 
 CancellationToken 
 . 
 None 
 ); 
  
 // Lets make sure that the start task finished successfully after the call to stop. 
  
 await 
  
 startTask 
 ; 
  
 return 
  
 messageCount 
 ; 
  
 } 
 } 
 

Go

  import 
  
 ( 
  
 "context" 
  
 "fmt" 
  
 "io" 
  
 "sync/atomic" 
  
 "time" 
  
 "cloud.google.com/go/pubsub/v2" 
 ) 
 func 
  
 pullMsgs 
 ( 
 w 
  
 io 
 . 
 Writer 
 , 
  
 projectID 
 , 
  
 subID 
  
 string 
 ) 
  
 error 
  
 { 
  
 // projectID := "my-project-id" 
  
 // subID := "my-sub" 
  
 ctx 
  
 := 
  
 context 
 . 
 Background 
 () 
  
 client 
 , 
  
 err 
  
 := 
  
 pubsub 
 . 
 NewClient 
 ( 
 ctx 
 , 
  
 projectID 
 ) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "pubsub.NewClient: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 defer 
  
 client 
 . 
 Close 
 () 
  
 // client.Subscriber can be passed a subscription ID (e.g. "my-sub") or 
  
 // a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub"). 
  
 // If a subscription ID is provided, the project ID from the client is used. 
  
 sub 
  
 := 
  
 client 
 . 
 Subscriber 
 ( 
 subID 
 ) 
  
 // Receive messages for 10 seconds, which simplifies testing. 
  
 // Comment this out in production, since `Receive` should 
  
 // be used as a long running operation. 
  
 ctx 
 , 
  
 cancel 
  
 := 
  
 context 
 . 
 WithTimeout 
 ( 
 ctx 
 , 
  
 10 
 * 
 time 
 . 
 Second 
 ) 
  
 defer 
  
 cancel 
 () 
  
 var 
  
 received 
  
 int32 
  
 err 
  
 = 
  
 sub 
 . 
 Receive 
 ( 
 ctx 
 , 
  
 func 
 ( 
 _ 
  
 context 
 . 
 Context 
 , 
  
 msg 
  
 * 
 pubsub 
 . 
 Message 
 ) 
  
 { 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Got message: %q\n" 
 , 
  
 string 
 ( 
 msg 
 . 
 Data 
 )) 
  
 atomic 
 . 
 AddInt32 
 ( 
& received 
 , 
  
 1 
 ) 
  
 msg 
 . 
 Ack 
 () 
  
 }) 
  
 if 
  
 err 
  
 != 
  
 nil 
  
 { 
  
 return 
  
 fmt 
 . 
 Errorf 
 ( 
 "sub.Receive: %w" 
 , 
  
 err 
 ) 
  
 } 
  
 fmt 
 . 
 Fprintf 
 ( 
 w 
 , 
  
 "Received %d messages\n" 
 , 
  
 received 
 ) 
  
 return 
  
 nil 
 } 
 

Java

  import 
  
 com.google.cloud.pubsub.v1. AckReplyConsumer 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. MessageReceiver 
 
 ; 
 import 
  
 com.google.cloud.pubsub.v1. Subscriber 
 
 ; 
 import 
  
 com.google.pubsub.v1. ProjectSubscriptionName 
 
 ; 
 import 
  
 com.google.pubsub.v1. PubsubMessage 
 
 ; 
 import 
  
 java.util.concurrent.TimeUnit 
 ; 
 import 
  
 java.util.concurrent.TimeoutException 
 ; 
 public 
  
 class 
 SubscribeAsyncExample 
  
 { 
  
 public 
  
 static 
  
 void 
  
 main 
 ( 
 String 
 ... 
  
 args 
 ) 
  
 throws 
  
 Exception 
  
 { 
  
 // TODO(developer): Replace these variables before running the sample. 
  
 String 
  
 projectId 
  
 = 
  
 "your-project-id" 
 ; 
  
 String 
  
 subscriptionId 
  
 = 
  
 "your-subscription-id" 
 ; 
  
 subscribeAsyncExample 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 } 
  
 public 
  
 static 
  
 void 
  
 subscribeAsyncExample 
 ( 
 String 
  
 projectId 
 , 
  
 String 
  
 subscriptionId 
 ) 
  
 { 
  
  ProjectSubscriptionName 
 
  
 subscriptionName 
  
 = 
  
  ProjectSubscriptionName 
 
 . 
 of 
 ( 
 projectId 
 , 
  
 subscriptionId 
 ); 
  
 // Instantiate an asynchronous message receiver. 
  
  MessageReceiver 
 
  
 receiver 
  
 = 
  
 ( 
 PubsubMessage 
  
 message 
 , 
  
 AckReplyConsumer 
  
 consumer 
 ) 
  
 - 
>  
 { 
  
 // Handle incoming message, then ack the received message. 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Id: " 
  
 + 
  
 message 
 . 
 getMessageId 
 ()); 
  
 System 
 . 
 out 
 . 
 println 
 ( 
 "Data: " 
  
 + 
  
 message 
 . 
 getData 
 (). 
 toStringUtf8 
 ()); 
  
 consumer 
 . 
 ack 
 (); 
  
 }; 
  
  Subscriber 
 
  
 subscriber 
  
 = 
  
 null 
 ; 
  
 try 
  
 { 
  
 subscriber 
  
 = 
  
  Subscriber 
 
 . 
 newBuilder 
 ( 
 subscriptionName 
 , 
  
 receiver 
 ). 
 build 
 (); 
  
 // Start the subscriber. 
  
 subscriber 
 . 
  startAsync 
 
 (). 
 awaitRunning 
 (); 
  
 System 
 . 
 out 
 . 
 printf 
 ( 
 "Listening for messages on %s:\n" 
 , 
  
 subscriptionName 
 . 
  toString 
 
 ()); 
  
 // Allow the subscriber to run for 30s unless an unrecoverable error occurs. 
  
 subscriber 
 . 
 awaitTerminated 
 ( 
 30 
 , 
  
 TimeUnit 
 . 
 SECONDS 
 ); 
  
 } 
  
 catch 
  
 ( 
 TimeoutException 
  
 timeoutException 
 ) 
  
 { 
  
 // Shut down the subscriber after 30s. Stop receiving messages. 
  
 subscriber 
 . 
 stopAsync 
 (); 
  
 } 
  
 } 
 } 
 

Node.js

  /** 
 * TODO(developer): Uncomment these variables before running the sample. 
 */ 
 // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; 
 // const timeout = 60; 
 // Imports the Google Cloud client library 
 const 
  
 { 
 PubSub 
 } 
  
 = 
  
 require 
 ( 
 ' @google-cloud/pubsub 
' 
 ); 
 // Creates a client; cache this for further use 
 const 
  
 pubSubClient 
  
 = 
  
 new 
  
  PubSub 
 
 (); 
 function 
  
 listenForMessages 
 ( 
 subscriptionNameOrId 
 , 
  
 timeout 
 ) 
  
 { 
  
 // References an existing subscription; if you are unsure if the 
  
 // subscription will exist, try the optimisticSubscribe sample. 
  
 const 
  
 subscription 
  
 = 
  
 pubSubClient 
 . 
 subscription 
 ( 
 subscriptionNameOrId 
 ); 
  
 // Create an event handler to handle messages 
  
 let 
  
 messageCount 
  
 = 
  
 0 
 ; 
  
 const 
  
 messageHandler 
  
 = 
  
 message 
  
 = 
>  
 { 
  
 console 
 . 
 log 
 ( 
 `Received message 
 ${ 
 message 
 . 
 id 
 } 
 :` 
 ); 
  
 console 
 . 
 log 
 ( 
 `\tData: 
 ${ 
 message 
 . 
 data 
 } 
 ` 
 ); 
  
 console 
 . 
 log 
 ( 
 `\tAttributes: 
 ${ 
 message 
 . 
 attributes 
 } 
 ` 
 ); 
  
 messageCount 
  
 += 
  
 1 
 ; 
  
 // "Ack" (acknowledge receipt of) the message 
  
 message 
 . 
  ack 
 
 (); 
  
 }; 
  
 // Listen for new messages until timeout is hit 
  
 subscripti on 
 
 . 
  on 
 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 // Wait a while for the subscription to run. (Part of the sample only.) 
  
 setTimeout 
 (() 
  
 = 
>  
 { 
  
 subscription 
 . 
 removeListener 
 ( 
 'message' 
 , 
  
 messageHandler 
 ); 
  
 console 
 . 
 log 
 ( 
 ` 
 ${ 
 messageCount 
 } 
 message(s) received.` 
 ); 
  
 }, 
  
 timeout 
  
 * 
  
 1000 
 ); 
 } 
 

PHP

  use Google\Cloud\PubSub\PubSubClient; 
 /** 
 * Pulls all Pub/Sub messages for a subscription. 
 * 
 * @param string $projectId  The Google project ID. 
 * @param string $subscriptionName  The Pub/Sub subscription name. 
 */ 
 function pull_messages($projectId, $subscriptionName) 
 { 
 $pubsub = new PubSubClient([ 
 'projectId' => $projectId, 
 ]); 
 $subscription = $pubsub->subscription($subscriptionName); 
 foreach ($subscription->pull() as $message) { 
 printf('Message: %s' . PHP_EOL, $message->data()); 
 // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times. 
 $subscription->acknowledge($message); 
 } 
 } 
 

Ruby

  # subscription_id = "your-subscription-id" 
 pubsub 
  
 = 
  
 Google 
 :: 
 Cloud 
 :: 
  PubSub 
 
 . 
  new 
 
 subscriber 
  
 = 
  
 pubsub 
 . 
  subscriber 
 
  
 subscription_id 
 listener 
  
 = 
  
 subscriber 
 . 
  listen 
 
  
 do 
  
 | 
 received_message 
 | 
  
 puts 
  
 "Received message: 
 #{ 
 received_message 
 . 
 data 
 } 
 " 
  
 received_message 
 . 
  acknowledge! 
 
 end 
 listener 
 . 
  start 
 
 # Let the main thread sleep for 60 seconds so the thread for listening 
 # messages does not quit 
 sleep 
  
 60 
 listener 
 . 
 stop 
 . 
 wait! 
 

How did it go?

Clean up (optional)

  1. To avoid incurring charges to your Google Cloud account for the resources used in this guide, you can use the command line to delete the topic and subscription.
      
    gcloud  
    pubsub  
    subscriptions  
    delete  
    my-sub  
    gcloud  
    pubsub  
    topics  
    delete  
    my-topic
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud  
    auth  
    application-default  
    revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud  
    auth  
    revoke

What's next

Create a Mobile Website
View Site in Mobile | Classic
Share by: