Skip to content

Commit e5be4de

Browse files
committed
Address comments.
1 parent d883ec2 commit e5be4de

File tree

11 files changed

+109
-97
lines changed

11 files changed

+109
-97
lines changed

bigquery/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START bigquery_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Google BigQuery
1918
const bigquery = require('@google-cloud/bigquery')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

datastore/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START datastore_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Google Cloud Datastore
1918
const datastore = require('@google-cloud/datastore')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

logging/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START logging_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Stackdriver Logging
1918
const logging = require('@google-cloud/logging')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

pubsub/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"yargs": "^5.0.0"
1414
},
1515
"devDependencies": {
16+
"async": "^2.0.1",
1617
"mocha": "^3.0.2",
1718
"node-uuid": "^1.4.7"
1819
},

pubsub/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START pubsub_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Google Cloud Pub/Sub
1918
const pubsub = require('@google-cloud/pubsub')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

pubsub/subscriptions.js

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,28 +216,30 @@ function pullOrderedMessages (subscriptionName, callback) {
216216
return;
217217
}
218218

219+
// Pub/Sub messages are unordered, so here we manually order messages by
220+
// their "counterId" attribute which was set when they were published.
219221
messages.forEach((message) => {
220-
outstandingMessages[message.attributes.orderId] = message;
222+
outstandingMessages[message.attributes.counterId] = message;
221223
});
222224

223-
const outstandingIds = Object.keys(outstandingMessages).map((orderId) => +orderId);
225+
const outstandingIds = Object.keys(outstandingMessages).map((counterId) => +counterId);
224226
outstandingIds.sort();
225227

226-
outstandingIds.forEach((orderId) => {
228+
outstandingIds.forEach((counterId) => {
227229
const counter = getSubscribeCounterValue();
228-
const message = outstandingMessages[orderId];
230+
const message = outstandingMessages[counterId];
229231

230-
if (orderId < counter) {
232+
if (counterId < counter) {
231233
// The message has already been processed
232234
subscription.ack(message.ackId);
233-
delete outstandingMessages[orderId];
234-
} else if (orderId === counter) {
235+
delete outstandingMessages[counterId];
236+
} else if (counterId === counter) {
235237
// Process the message
236238
console.log(`* %d %j %j`, message.id, message.data, message.attributes);
237239

238-
setSubscribeCounterValue(orderId + 1);
240+
setSubscribeCounterValue(counterId + 1);
239241
subscription.ack(message.ackId);
240-
delete outstandingMessages[orderId];
242+
delete outstandingMessages[counterId];
241243
} else {
242244
// Have not yet processed the message on which this message is dependent
243245
return false;

pubsub/system-test/subscriptions.test.js

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
'use strict';
1515

16+
const async = require(`async`);
1617
const pubsub = require(`@google-cloud/pubsub`)();
1718
const uuid = require(`node-uuid`);
1819
const path = require(`path`);
@@ -115,40 +116,43 @@ describe(`pubsub:subscriptions`, () => {
115116
it(`should pull ordered messages`, (done) => {
116117
const subscriptions = require('../subscriptions');
117118
const expected = `Hello, world!`;
118-
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '3' } }, (err, firstMessageIds) => {
119-
assert.ifError(err);
120-
setTimeout(() => {
121-
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
122-
assert.ifError(err);
123-
assert.equal(console.log.callCount, 0);
124-
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '1' } }, (err, secondMessageIds) => {
125-
assert.ifError(err);
126-
setTimeout(() => {
127-
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
128-
assert.ifError(err);
129-
assert.equal(console.log.callCount, 1);
130-
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, secondMessageIds[0], expected, { orderId: '1' }]);
131-
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '1' } }, (err) => {
132-
assert.ifError(err);
133-
pubsub.topic(topicName).publish({ data: expected, attributes: { orderId: '2' } }, (err, thirdMessageIds) => {
134-
assert.ifError(err);
135-
setTimeout(() => {
136-
subscriptions.pullOrderedMessages(subscriptionNameOne, (err) => {
137-
assert.ifError(err);
138-
assert.equal(console.log.callCount, 3);
139-
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, thirdMessageIds[0], expected, { orderId: '2' }]);
140-
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, firstMessageIds[0], expected, { orderId: '3' }]);
141-
done();
142-
});
143-
}, 2000);
144-
});
145-
});
146-
});
147-
}, 2000);
148-
});
149-
});
150-
}, 2000);
151-
});
119+
const publishedMessageIds = [];
120+
121+
async.waterfall([
122+
(cb) => {
123+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '3' } }, cb);
124+
},
125+
(messageIds, apiResponse, cb) => {
126+
publishedMessageIds.push(messageIds[0]);
127+
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
128+
},
129+
(cb) => {
130+
assert.equal(console.log.callCount, 0);
131+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
132+
},
133+
(messageIds, apiResponse, cb) => {
134+
publishedMessageIds.push(messageIds[0]);
135+
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
136+
},
137+
(cb) => {
138+
assert.equal(console.log.callCount, 1);
139+
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
140+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
141+
},
142+
(messageIds, apiResponse, cb) => {
143+
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '2' } }, cb);
144+
},
145+
(messageIds, apiResponse, cb) => {
146+
publishedMessageIds.push(messageIds[0]);
147+
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
148+
},
149+
(cb) => {
150+
assert.equal(console.log.callCount, 3);
151+
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
152+
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
153+
cb();
154+
}
155+
], done);
152156
});
153157

154158
it(`should set the IAM policy for a subscription`, (done) => {

pubsub/system-test/topics.test.js

Lines changed: 54 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
'use strict';
1515

16+
const async = require(`async`);
1617
const pubsub = require(`@google-cloud/pubsub`)();
1718
const uuid = require(`node-uuid`);
1819
const path = require(`path`);
@@ -58,57 +59,66 @@ describe(`pubsub:topics`, () => {
5859
});
5960

6061
it(`should publish a simple message`, (done) => {
61-
pubsub.topic(topicName).subscribe(subscriptionName, (err, subscription) => {
62-
assert.ifError(err);
63-
run(`${cmd} publish ${topicName} "${message.data}"`, cwd);
64-
setTimeout(() => {
65-
subscription.pull((err, messages) => {
66-
assert.ifError(err);
67-
assert.equal(messages[0].data, message.data);
68-
done();
69-
});
70-
}, 2000);
71-
});
62+
async.waterfall([
63+
(cb) => {
64+
pubsub.topic(topicName).subscribe(subscriptionName, cb);
65+
},
66+
(subscription, apiResponse, cb) => {
67+
run(`${cmd} publish ${topicName} "${message.data}"`, cwd);
68+
setTimeout(() => subscription.pull(cb), 2000);
69+
},
70+
(messages, apiResponse, cb) => {
71+
assert.equal(messages[0].data, message.data);
72+
cb();
73+
}
74+
], done);
7275
});
7376

7477
it(`should publish a JSON message`, (done) => {
75-
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, (err, subscription) => {
76-
assert.ifError(err);
77-
run(`${cmd} publish ${topicName} '${JSON.stringify(message)}'`, cwd);
78-
setTimeout(() => {
79-
subscription.pull((err, messages) => {
80-
assert.ifError(err);
81-
assert.deepEqual(messages[0].data, message);
82-
done();
83-
});
84-
}, 2000);
85-
});
78+
async.waterfall([
79+
(cb) => {
80+
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, cb);
81+
},
82+
(subscription, apiResponse, cb) => {
83+
run(`${cmd} publish ${topicName} '${JSON.stringify(message)}'`, cwd);
84+
setTimeout(() => subscription.pull(cb), 2000);
85+
},
86+
(messages, apiResponse, cb) => {
87+
assert.deepEqual(messages[0].data, message);
88+
cb();
89+
}
90+
], done);
8691
});
8792

8893
it(`should publish ordered messages`, (done) => {
8994
const topics = require('../topics');
90-
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, (err, subscription) => {
91-
assert.ifError(err);
92-
topics.publishOrderedMessage(topicName, message.data, () => {
93-
setTimeout(() => {
94-
subscription.pull((err, messages) => {
95-
assert.ifError(err);
96-
assert.equal(messages[0].data, message.data);
97-
assert.equal(messages[0].attributes.orderId, '1');
98-
topics.publishOrderedMessage(topicName, message.data, () => {
99-
setTimeout(() => {
100-
subscription.pull((err, messages) => {
101-
assert.ifError(err);
102-
assert.equal(messages[0].data, message.data);
103-
assert.equal(messages[0].attributes.orderId, '2');
104-
done();
105-
});
106-
}, 2000);
107-
});
108-
});
109-
}, 2000);
110-
});
111-
});
95+
let subscription;
96+
97+
async.waterfall([
98+
(cb) => {
99+
pubsub.topic(topicName).subscribe(subscriptionName, { reuseExisting: true }, cb);
100+
},
101+
(_subscription, apiResponse, cb) => {
102+
subscription = _subscription;
103+
topics.publishOrderedMessage(topicName, message.data, cb);
104+
},
105+
(cb) => {
106+
setTimeout(() => subscription.pull(cb), 2000);
107+
},
108+
(messages, apiResponse, cb) => {
109+
assert.equal(messages[0].data, message.data);
110+
assert.equal(messages[0].attributes.counterId, '1');
111+
topics.publishOrderedMessage(topicName, message.data, cb);
112+
},
113+
(cb) => {
114+
setTimeout(() => subscription.pull(cb), 2000);
115+
},
116+
(messages, apiResponse, cb) => {
117+
assert.equal(messages[0].data, message.data);
118+
assert.equal(messages[0].attributes.counterId, '2');
119+
topics.publishOrderedMessage(topicName, message.data, cb);
120+
}
121+
], done);
112122
});
113123

114124
it(`should set the IAM policy for a topic`, (done) => {

pubsub/topics.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,10 @@ function publishOrderedMessage (topicName, data, callback) {
147147
const message = {
148148
data: data,
149149

150-
// Assign an order id to the message
150+
// Pub/Sub messages are unordered, so assign an order id to the message to
151+
// manually order messages
151152
attributes: {
152-
orderId: '' + getPublishCounterValue()
153+
counterId: '' + getPublishCounterValue()
153154
}
154155
};
155156

@@ -160,7 +161,7 @@ function publishOrderedMessage (topicName, data, callback) {
160161
}
161162

162163
// Update the counter value
163-
setPublishCounterValue(+message.attributes.orderId + 1);
164+
setPublishCounterValue(+message.attributes.counterId + 1);
164165

165166
console.log(`Message ${messageIds[0]} published.`);
166167
callback();

storage/quickstart.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
// [START storage_quickstart]
1717
// Imports and instantiates the Google Cloud client library
18-
// for Google Cloud Storage
1918
const storage = require('@google-cloud/storage')({
2019
projectId: 'YOUR_PROJECT_ID'
2120
});

0 commit comments

Comments
 (0)