You can use a Workflows connector to support Pub/Sub operations, including publishing messages to a Pub/Sub topic.
A Pub/Sub topic is a resource to which messages are sent by publishers. A subscription represents the stream of messages from a topic that are to be delivered to the subscribing application. Learn more about Pub/Sub .
Publishing messages
Once a Pub/Sub topic and a subscription to that topic has been created, you can create a workflow that publishes a message to that topic:
YAML
- init : assign : - project : '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}' - topic : TOPIC_ID - subscription : SUBSCRIPTION_ID - message : hello : world - base64Msg : '${base64.encode(json.encode(message))}' - publish_message_to_topic : call : googleapis.pubsub.v1.projects.topics.publish args : topic : '${"projects/" + project + "/topics/" + topic}' body : messages : - data : '${base64Msg}'
JSON
[ { "init" : { "assign" : [ { "project" : "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}" }, { "topic" : " TOPIC_ID " }, { "subscription" : " SUBSCRIPTION_ID " }, { "message" : { "hello" : "world" } }, { "base64Msg" : "${base64.encode(json.encode(message))}" } ] } }, { "publish_message_to_topic" : { "call" : "googleapis.pubsub.v1.projects.topics.publish" , "args" : { "topic" : "${\"projects/\" + project + \"/topics/\" + topic}" , "body" : { "messages" : [ { "data" : "${base64Msg}" } ] } } } } ]
Replace the following:
-
TOPIC_ID: the ID or fully qualified identifier for the Pub/Sub topic. -
SUBSCRIPTION_ID: the ID or fully qualified identifier for the Pub/Sub subscription.
Pulling messages
You can create an Eventarc trigger that connects a Pub/Sub topic to a Workflows event receiver. A message is published to a Pub/Sub topic to generate an event, and the event is passed as a runtime argument to the destination workflow. For more information, see Trigger a workflow with events or Pub/Sub messages .
You can also create a workflow that pulls the Pub/Sub message. In the following example, the workflow waits for the message to be published using polling .
YAML
- pullMessage : call : googleapis.pubsub.v1.projects.subscriptions.pull args : subscription : '${"projects/" + project + "/subscriptions/" + subscription}' body : maxMessages : 1 result : m - checkState : switch : - condition : ${m.receivedMessages[0].message.data != ""} next : outputMessage - wait : call : sys.sleep args : seconds : 60 next : pullMessage - outputMessage : return : '${json.decode(base64.decode(m.receivedMessages[0].message.data))}'
JSON
[ { "pullMessage" : { "call" : "googleapis.pubsub.v1.projects.subscriptions.pull" , "args" : { "subscription" : "${\"projects/\" + project + \"/subscriptions/\" + subscription}" , "body" : { "maxMessages" : 1 } }, "result" : "m" } }, { "checkState" : { "switch" : [ { "condition" : "${m.receivedMessages[0].message.data != \"\"}" , "next" : "outputMessage" } ] } }, { "wait" : { "call" : "sys.sleep" , "args" : { "seconds" : 60 }, "next" : "pullMessage" } }, { "outputMessage" : { "return" : "${json.decode(base64.decode(m.receivedMessages[0].message.data))}" } } ]

