Skip to content

Commit 38ffc4f

Browse files
committed
Add message ordering sample.
1 parent 2724942 commit 38ffc4f

File tree

6 files changed

+165
-43
lines changed

6 files changed

+165
-43
lines changed

pubsub/subscriptions.js

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323

2424
'use strict';
2525

26-
const pubsubClient = require(`@google-cloud/pubsub`)();
26+
const PubSub = require(`@google-cloud/pubsub`);
2727

2828
// [START pubsub_list_subscriptions]
2929
function listSubscriptions (callback) {
30+
// Instantiates the client library
31+
const pubsubClient = PubSub();
32+
3033
// Lists all subscriptions in the current project
3134
pubsubClient.getSubscriptions((err, subscriptions) => {
3235
if (err) {
@@ -43,6 +46,9 @@ function listSubscriptions (callback) {
4346

4447
// [START pubsub_list_topic_subscriptions]
4548
function listTopicSubscriptions (topicName, callback) {
49+
// Instantiates the client library
50+
const pubsubClient = PubSub();
51+
4652
// References an existing topic, e.g. "my-topic"
4753
const topic = pubsubClient.topic(topicName);
4854

@@ -62,6 +68,9 @@ function listTopicSubscriptions (topicName, callback) {
6268

6369
// [START pubsub_create_subscription]
6470
function createSubscription (topicName, subscriptionName, callback) {
71+
// Instantiates the client library
72+
const pubsubClient = PubSub();
73+
6574
// References an existing topic, e.g. "my-topic"
6675
const topic = pubsubClient.topic(topicName);
6776

@@ -80,16 +89,18 @@ function createSubscription (topicName, subscriptionName, callback) {
8089

8190
// [START pubsub_create_push_subscription]
8291
function createPushSubscription (topicName, subscriptionName, callback) {
92+
// Instantiates the client library
93+
const pubsubClient = PubSub();
94+
8395
// References an existing topic, e.g. "my-topic"
8496
const topic = pubsubClient.topic(topicName);
85-
const projectId = process.env.GCLOUD_PROJECT || 'YOU_PROJECT_ID';
8697

8798
// Creates a new push subscription, e.g. "my-new-subscription"
8899
topic.subscribe(subscriptionName, {
89100
pushConfig: {
90101
// Set to an HTTPS endpoint of your choice. If necessary, register
91102
// (authorize) the domain on which the server is hosted.
92-
pushEndpoint: `https://${projectId}.appspot.com/push`
103+
pushEndpoint: `https://${pubsubClient.projectId}.appspot.com/push`
93104
}
94105
}, (err, subscription) => {
95106
if (err) {
@@ -105,6 +116,9 @@ function createPushSubscription (topicName, subscriptionName, callback) {
105116

106117
// [START pubsub_delete_subscription]
107118
function deleteSubscription (subscriptionName, callback) {
119+
// Instantiates the client library
120+
const pubsubClient = PubSub();
121+
108122
// References an existing subscription, e.g. "my-subscription"
109123
const subscription = pubsubClient.subscription(subscriptionName);
110124

@@ -123,6 +137,9 @@ function deleteSubscription (subscriptionName, callback) {
123137

124138
// [START pubsub_get_subscription_metadata]
125139
function getSubscriptionMetadata (subscriptionName, callback) {
140+
// Instantiates the client library
141+
const pubsubClient = PubSub();
142+
126143
// References an existing subscription, e.g. "my-subscription"
127144
const subscription = pubsubClient.subscription(subscriptionName);
128145

@@ -144,6 +161,9 @@ function getSubscriptionMetadata (subscriptionName, callback) {
144161

145162
// [START pubsub_pull_messages]
146163
function pullMessages (subscriptionName, callback) {
164+
// Instantiates the client library
165+
const pubsubClient = PubSub();
166+
147167
// References an existing subscription, e.g. "my-subscription"
148168
const subscription = pubsubClient.subscription(subscriptionName);
149169

@@ -179,9 +199,12 @@ function setSubscribeCounterValue (value) {
179199
}
180200

181201
// [START pubsub_pull_ordered_messages]
182-
var outstandingMessages = {};
202+
const outstandingMessages = {};
183203

184204
function pullOrderedMessages (subscriptionName, callback) {
205+
// Instantiates the client library
206+
const pubsubClient = PubSub();
207+
185208
// References an existing subscription, e.g. "my-subscription"
186209
const subscription = pubsubClient.subscription(subscriptionName);
187210

@@ -193,39 +216,43 @@ function pullOrderedMessages (subscriptionName, callback) {
193216
return;
194217
}
195218

196-
// Sort messages in order of increasing messageId
197-
messages.sort((a, b) => b.messageId - a.messageId);
198-
199-
// Iterate over messages in order of increasing messageId
200-
messages.forEach((message) => outstandingMessages[message.messageId] = message);
219+
messages.forEach((message) => {
220+
outstandingMessages[message.attributes.orderId] = message;
221+
});
201222

202-
const outstandingMessageIds = Object.keys(outstandingMessages);
203-
outstandingMessageIds.sort();
223+
const outstandingIds = Object.keys(outstandingMessages).map((orderId) => +orderId);
224+
outstandingIds.sort();
204225

205-
outstandingMessageIds.forEach((messageId) => {
226+
outstandingIds.forEach((orderId) => {
206227
const counter = getSubscribeCounterValue();
207-
const message = outstandingMessages[messageId];
228+
const message = outstandingMessages[orderId];
208229

209-
if (messageId < counter) {
230+
if (orderId < counter) {
210231
// The message has already been processed
211232
subscription.ack(message.ackId);
212-
delete outstandingMessages[messageId];
213-
} else if (messageId === counter) {
214-
handleMessage(message);
215-
setSubscribeCounterValue(messageId + 1);
233+
delete outstandingMessages[orderId];
234+
} else if (orderId === counter) {
235+
// Process the message
236+
console.log(`* %d %j %j`, message.id, message.data, message.attributes);
237+
238+
setSubscribeCounterValue(orderId + 1);
216239
subscription.ack(message.ackId);
217-
delete outstandingMessages[messageId];
240+
delete outstandingMessages[orderId];
218241
} else {
219242
// Have not yet processed the message on which this message is dependent
220243
return false;
221244
}
222245
});
246+
callback();
223247
});
224248
}
225249
// [END pubsub_pull_ordered_messages]
226250

227251
// [START pubsub_get_subscription_policy]
228252
function getSubscriptionPolicy (subscriptionName, callback) {
253+
// Instantiates the client library
254+
const pubsubClient = PubSub();
255+
229256
// References an existing subscription, e.g. "my-subscription"
230257
const subscription = pubsubClient.subscription(subscriptionName);
231258

@@ -244,6 +271,9 @@ function getSubscriptionPolicy (subscriptionName, callback) {
244271

245272
// [START pubsub_set_subscription_policy]
246273
function setSubscriptionPolicy (subscriptionName, callback) {
274+
// Instantiates the client library
275+
const pubsubClient = PubSub();
276+
247277
// References an existing subscription, e.g. "my-subscription"
248278
const subscription = pubsubClient.subscription(subscriptionName);
249279

@@ -278,6 +308,9 @@ function setSubscriptionPolicy (subscriptionName, callback) {
278308

279309
// [START pubsub_test_subscription_permissions]
280310
function testSubscriptionPermissions (subscriptionName, callback) {
311+
// Instantiates the client library
312+
const pubsubClient = PubSub();
313+
281314
// References an existing subscription, e.g. "my-subscription"
282315
const subscription = pubsubClient.subscription(subscriptionName);
283316

pubsub/system-test/subscriptions.test.js

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,46 @@ describe(`pubsub:subscriptions`, () => {
108108
`* ${messageIds[0]} "${expected}" {}`;
109109
assert.equal(output, expectedOutput);
110110
done();
111-
}, 5000);
111+
}, 2000);
112+
});
113+
});
114+
115+
it(`should pull ordered messages`, (done) => {
116+
const subscriptions = require('../subscriptions');
117+
const expected = `Hello, world!`;
118+
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '3' } }, (err, firstMessageIds) => {
119+
assert.ifError(err);
120+
setTimeout(() => {
121+
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
122+
assert.ifError(err);
123+
assert.equal(console.log.callCount, 0);
124+
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '1' } }, (err, secondMessageIds) => {
125+
assert.ifError(err);
126+
setTimeout(() => {
127+
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
128+
assert.ifError(err);
129+
assert.equal(console.log.callCount, 1);
130+
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, secondMessageIds[0], expected, { orderId: '1' }]);
131+
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '1' } }, (err) => {
132+
assert.ifError(err);
133+
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '2' } }, (err, thirdMessageIds) => {
134+
assert.ifError(err);
135+
setTimeout(() => {
136+
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
137+
assert.ifError(err);
138+
assert.equal(console.log.callCount, 3);
139+
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, thirdMessageIds[0], expected, { orderId: '2' }]);
140+
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, firstMessageIds[0], expected, { orderId: '3' }]);
141+
done();
142+
});
143+
}, 2000);
144+
});
145+
});
146+
});
147+
}, 2000);
148+
});
149+
});
150+
}, 2000);
112151
});
113152
});
114153

pubsub/system-test/topics.test.js

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,24 +61,52 @@ describe(`pubsub:topics`, () => {
6161
pubsub.topic(topicName).subscribe(subscriptionName, (err, subscription) => {
6262
assert.ifError(err);
6363
run(`${cmd} publish ${topicName} "${message.data}"`, cwd);
64-
subscription.pull((err, messages) => {
65-
assert.ifError(err);
66-
console.log(JSON.stringify(messages, null, 2));
67-
assert.equal(messages[0].data, message.data);
68-
done();
69-
});
64+
setTimeout(() => {
65+
subscription.pull((err, messages) => {
66+
assert.ifError(err);
67+
assert.equal(messages[0].data, message.data);
68+
done();
69+
});
70+
}, 2000);
7071
});
7172
});
7273

7374
it(`should publish a JSON message`, (done) => {
7475
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, (err, subscription) => {
7576
assert.ifError(err);
7677
run(`${cmd} publish ${topicName} '${JSON.stringify(message)}'`, cwd);
77-
subscription.pull((err, messages) => {
78-
assert.ifError(err);
79-
console.log(JSON.stringify(messages, null, 2));
80-
assert.deepEqual(messages[0].data, message);
81-
done();
78+
setTimeout(() => {
79+
subscription.pull((err, messages) => {
80+
assert.ifError(err);
81+
assert.deepEqual(messages[0].data, message);
82+
done();
83+
});
84+
}, 2000);
85+
});
86+
});
87+
88+
it(`should publish ordered messages`, (done) => {
89+
const topics = require('../topics');
90+
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, (err, subscription) => {
91+
assert.ifError(err);
92+
topics.publishOrderedMessage(topicName, message.data, () => {
93+
setTimeout(() => {
94+
subscription.pull((err, messages) => {
95+
assert.ifError(err);
96+
assert.equal(messages[0].data, message.data);
97+
assert.equal(messages[0].attributes.orderId, '1');
98+
topics.publishOrderedMessage(topicName, message.data, () => {
99+
setTimeout(() => {
100+
subscription.pull((err, messages) => {
101+
assert.ifError(err);
102+
assert.equal(messages[0].data, message.data);
103+
assert.equal(messages[0].attributes.orderId, '2');
104+
done();
105+
});
106+
}, 2000);
107+
});
108+
});
109+
}, 2000);
82110
});
83111
});
84112
});

pubsub/test/subscriptions.test.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ describe(`pubsub:subscriptions`, () => {
5252
program.listSubscriptions(callback);
5353
program.listTopicSubscriptions(topicName, callback);
5454
program.pullMessages(subscriptionName, callback);
55+
program.pullOrderedMessages(subscriptionName, callback);
5556
program.getSubscriptionPolicy(subscriptionName, callback);
5657
program.setSubscriptionPolicy(subscriptionName, callback);
5758
program.testSubscriptionPermissions(subscriptionName, callback);
5859

59-
assert.equal(callback.callCount, 10);
60+
assert.equal(callback.callCount, 11);
6061
assert.equal(callback.alwaysCalledWithExactly(error), true);
6162
});
6263
});

pubsub/test/topics.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ describe(`pubsub:topics`, () => {
4848
program.setTopicPolicy(topicName, callback);
4949
program.testTopicPermissions(topicName, callback);
5050

51-
assert.equal(callback.callCount, 7);
51+
assert.equal(callback.callCount, 8);
5252
assert.equal(callback.alwaysCalledWithExactly(error), true);
5353
});
5454
});

0 commit comments

Comments
 (0)