Skip to content

Commit af42ab2

Browse files
committed
Initial commit.
1 parent 35b153f commit af42ab2

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

pubsub/subscriptions.js

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,62 @@ function pullMessages (subscriptionName, callback) {
168168
}
169169
// [END pubsub_pull_messages]
170170

171+
let subscribeCounterValue = 1;
172+
173+
function getSubscribeCounterValue () {
174+
return subscribeCounterValue;
175+
}
176+
177+
function setSubscribeCounterValue (value) {
178+
subscribeCounterValue = value;
179+
}
180+
181+
// [START pubsub_pull_ordered_messages]
182+
var outstandingMessages = {};
183+
184+
function pullOrderedMessages (subscriptionName, callback) {
185+
// References an existing subscription, e.g. "my-subscription"
186+
const subscription = pubsubClient.subscription(subscriptionName);
187+
188+
// Pulls messages. Set returnImmediately to false to block until messages are
189+
// received.
190+
subscription.pull({ returnImmediately: true }, (err, messages) => {
191+
if (err) {
192+
callback(err);
193+
return;
194+
}
195+
196+
// Sort messages in order of increasing messageId
197+
messages.sort((a, b) => b.messageId - a.messageId);
198+
199+
// Iterate over messages in order of increasing messageId
200+
messages.forEach((message) => outstandingMessages[message.messageId] = message);
201+
202+
const outstandingMessageIds = Object.keys(outstandingMessages);
203+
outstandingMessageIds.sort();
204+
205+
outstandingMessageIds.forEach((messageId) => {
206+
const counter = getSubscribeCounterValue();
207+
const message = outstandingMessages[messageId];
208+
209+
if (messageId < counter) {
210+
// The message has already been processed
211+
subscription.ack(message.ackId);
212+
delete outstandingMessages[messageId];
213+
} else if (messageId === counter) {
214+
handleMessage(message);
215+
setSubscribeCounterValue(messageId + 1);
216+
subscription.ack(message.ackId);
217+
delete outstandingMessages[messageId];
218+
} else {
219+
// Have not yet processed the message on which this message is dependent
220+
return false;
221+
}
222+
});
223+
});
224+
}
225+
// [END pubsub_pull_ordered_messages]
226+
171227
// [START pubsub_get_subscription_policy]
172228
function getSubscriptionPolicy (subscriptionName, callback) {
173229
// References an existing subscription, e.g. "my-subscription"
@@ -255,6 +311,7 @@ const program = module.exports = {
255311
deleteSubscription: deleteSubscription,
256312
getSubscriptionMetadata: getSubscriptionMetadata,
257313
pullMessages: pullMessages,
314+
pullOrderedMessages: pullOrderedMessages,
258315
getSubscriptionPolicy: getSubscriptionPolicy,
259316
setSubscriptionPolicy: setSubscriptionPolicy,
260317
testSubscriptionPermissions: testSubscriptionPermissions,

pubsub/topics.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,52 @@ function publishMessage (topicName, data, callback) {
105105
}
106106
// [END pubsub_publish_message]
107107

108+
let publishCounterValue = 1;
109+
110+
function getPublishCounterValue () {
111+
return publishCounterValue;
112+
}
113+
114+
function setPublishCounterValue (value) {
115+
publishCounterValue = value;
116+
}
117+
118+
// [START pubsub_publish_ordered_message]
119+
function publishOrderedMessage (topicName, data, callback) {
120+
// References an existing topic, e.g. "my-topic"
121+
const topic = pubsubClient.topic(topicName);
122+
123+
/**
124+
* In Node.js, a PubSub message requires a "data" property, which can have a
125+
* string or an object as its value. An optional "attributes" property can be
126+
* an object of key/value pairs, where the keys and values are both strings.
127+
* See https://cloud.google.com/pubsub/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
128+
*
129+
* Topic#publish() takes either a single message object or an array of message
130+
* objects. See https://googlecloudplatform.github.io/google-cloud-node/#/docs/pubsub/latest/pubsub/topic?method=publish
131+
*/
132+
const message = {
133+
data: data,
134+
135+
// Assign an id to the message
136+
messageId: getPublishCounterValue()
137+
};
138+
139+
topic.publish(message, (err, messageIds) => {
140+
if (err) {
141+
callback(err);
142+
return;
143+
}
144+
145+
// Update the counter value
146+
setPublishCounterValue(message.messageId + 1);
147+
148+
console.log(`Message ${messageIds[0]} published.`);
149+
callback();
150+
});
151+
}
152+
// [END pubsub_publish_ordered_message]
153+
108154
// [START pubsub_get_topic_policy]
109155
function getTopicPolicy (topicName, callback) {
110156
// References an existing topic, e.g. "my-topic"
@@ -191,10 +237,16 @@ const program = module.exports = {
191237
createTopic: createTopic,
192238
deleteTopic: deleteTopic,
193239
publishMessage: publishMessage,
240+
<<<<<<< 35b153f2b1c74343053f4e9d6dd9951cca326f73
194241
getTopicPolicy: getTopicPolicy,
195242
setTopicPolicy: setTopicPolicy,
196243
testTopicPermissions: testTopicPermissions,
197244
main: (args) => {
245+
=======
246+
publishOrderedMessage: publishOrderedMessage,
247+
listTopics: listTopics,
248+
main: function (args) {
249+
>>>>>>> Initial commit.
198250
// Run the command-line program
199251
cli.help().strict().parse(args).argv;
200252
}

0 commit comments

Comments
 (0)