Publish a message to a Pub/Sub topic

You can use a Workflows connector to support Pub/Sub operations, including publishing messages to a Pub/Sub topic.

A Pub/Sub topic is a resource to which messages are sent by publishers. A subscription represents the stream of messages from a topic that are to be delivered to the subscribing application. Learn more about Pub/Sub.

Publishing messages

Once a Pub/Sub topic and a subscription to that topic has been created, you can create a workflow that publishes a message to that topic:

YAML

- init:  assign:  - project: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'  - topic: TOPIC_ID  - subscription: SUBSCRIPTION_ID  - message:  hello: world  - base64Msg: '${base64.encode(json.encode(message))}' - publish_message_to_topic:  call: googleapis.pubsub.v1.projects.topics.publish  args:  topic: '${"projects/" + project + "/topics/" + topic}'  body:  messages:  - data: '${base64Msg}'

JSON

[ {  "init": {  "assign": [  {  "project": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"  },  {  "topic": "TOPIC_ID"  },  {  "subscription": "SUBSCRIPTION_ID"  },  {  "message": {  "hello": "world"  }  },  {  "base64Msg": "${base64.encode(json.encode(message))}"  }  ]  } }, {  "publish_message_to_topic": {  "call": "googleapis.pubsub.v1.projects.topics.publish",  "args": {  "topic": "${\"projects/\" + project + \"/topics/\" + topic}",  "body": {  "messages": [  {  "data": "${base64Msg}"  }  ]  }  }  } } ]

Replace the following:

  • TOPIC_ID: the ID or fully qualified identifier for the Pub/Sub topic.

  • SUBSCRIPTION_ID: the ID or fully qualified identifier for the Pub/Sub subscription.

Pulling messages

You can create an Eventarc trigger that connects a Pub/Sub topic to a Workflows event receiver. A message is published to a Pub/Sub topic to generate an event, and the event is passed as a runtime argument to the destination workflow. For more information, see Trigger a workflow with events or Pub/Sub messages.

You can also create a workflow that pulls the Pub/Sub message. In the following example, the workflow waits for the message to be published using polling.

YAML

- pullMessage:  call: googleapis.pubsub.v1.projects.subscriptions.pull  args:  subscription: '${"projects/" + project + "/subscriptions/" + subscription}'  body:  maxMessages: 1  result: m - checkState:  switch:  - condition: ${m.receivedMessages[0].message.data != ""}  next: outputMessage - wait:  call: sys.sleep  args:  seconds: 60  next: pullMessage - outputMessage:  return: '${json.decode(base64.decode(m.receivedMessages[0].message.data))}'

JSON

 [  {  "pullMessage": {  "call": "googleapis.pubsub.v1.projects.subscriptions.pull",  "args": {  "subscription": "${\"projects/\" + project + \"/subscriptions/\" + subscription}",  "body": {  "maxMessages": 1  }  },  "result": "m"  }  },  {  "checkState": {  "switch": [  {  "condition": "${m.receivedMessages[0].message.data != \"\"}",  "next": "outputMessage"  }  ]  }  },  {  "wait": {  "call": "sys.sleep",  "args": {  "seconds": 60  },  "next": "pullMessage"  }  },  {  "outputMessage": {  "return": "${json.decode(base64.decode(m.receivedMessages[0].message.data))}"  }  }  ]