Skip to content

Commit 51b86a2

Browse files
committed
Added Pub/Sub samples.
1 parent add9bf6 commit 51b86a2

File tree

4 files changed

+347
-0
lines changed

4 files changed

+347
-0
lines changed

pubsub/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
## Pub/Sub Samples
2+
3+
These samples require two environment variables to be set:
4+
5+
- `GOOGLE_APPLICATION_CREDENTIALS` - Path to a service account file. You can
6+
download one from your Google project's "permissions" page.
7+
- `TEST_PROJECT_ID` - Id of your Google project.
8+
9+
## Run a sample
10+
11+
Install dependencies:
12+
13+
npm install
14+
15+
To print available commands:
16+
17+
npm run
18+
19+
Execute a sample:
20+
21+
npm run <sample>
22+
23+
Example:
24+
25+
npm run write

pubsub/iam.js

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2016, Google, Inc.
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
'use strict';
15+
16+
var async = require('async');
17+
var utils = require('./subscription');
18+
var createTopic = utils.createTopic;
19+
var subscribe = utils.subscribe;
20+
21+
// [START get_topic_policy]
22+
function getTopicPolicy(topic, callback) {
23+
// Retrieve the IAM policy for the provided topic
24+
topic.iam.getPolicy(callback);
25+
}
26+
// [END get_topic_policy]
27+
28+
// [START get_subscription_policy]
29+
function getSubscriptionPolicy(subscription, callback) {
30+
// Retrieve the IAM policy for the provided subscription
31+
subscription.iam.getPolicy(callback);
32+
}
33+
// [END get_subscription_policy]
34+
35+
exports.getTopicPolicy = getTopicPolicy;
36+
exports.getSubscriptionPolicy = getSubscriptionPolicy;
37+
38+
if (module === require.main) {
39+
var _subscription;
40+
var _topic;
41+
async.waterfall([
42+
function (cb) {
43+
console.log('create topic...');
44+
createTopic(cb);
45+
},
46+
function (topic, apiResponse, cb) {
47+
_topic = topic;
48+
console.log('created topic');
49+
console.log('get topic IAM policy...');
50+
getTopicPolicy(topic, cb);
51+
},
52+
function (policy, apiResponse, cb) {
53+
console.log('got topic policy', policy);
54+
console.log('create subscription...');
55+
subscribe(cb);
56+
},
57+
function (subscription, apiResponse, cb) {
58+
_subscription = subscription;
59+
console.log('created subscription');
60+
console.log('get subscription IAM policy...');
61+
getSubscriptionPolicy(subscription, cb);
62+
},
63+
function (policy, apiResponse, cb) {
64+
console.log('got subscription policy', policy);
65+
console.log('deleting subscription...');
66+
_subscription.delete(cb);
67+
},
68+
function (apiResponse, cb) {
69+
console.log('deleted subscription');
70+
console.log('deleting topic...');
71+
_topic.delete(cb);
72+
}
73+
], function (err) {
74+
if (err) {
75+
console.error(err);
76+
} else {
77+
console.log('deleted topic');
78+
}
79+
});
80+
}

pubsub/package.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "nodejs-docs-samples-pubsub",
3+
"description": "Node.js samples for Google Cloud Pub/Sub.",
4+
"version": "0.0.1",
5+
"private": true,
6+
"license": "Apache Version 2.0",
7+
"engines": {
8+
"node": ">=0.10.x"
9+
},
10+
"scripts": {
11+
"iam": "node iam.js",
12+
"subscription": "node subscription.js"
13+
},
14+
"dependencies": {
15+
"gcloud": "^0.27.0"
16+
}
17+
}

pubsub/subscription.js

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
// Copyright 2016, Google, Inc.
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
'use strict';
15+
16+
var async = require('async');
17+
18+
// [START auth]
19+
// You must set these environment variables to run this sample
20+
var projectId = process.env.TEST_PROJECT_ID;
21+
var keyFilename = process.env.GOOGLE_APPLICATION_CREDENTIALS;
22+
23+
// If you don't set the environment variables, then you can modify this file
24+
// to set the values
25+
projectId = projectId || '<your-project-id>';
26+
keyFilename = keyFilename || '/path/to/keyfile.json';
27+
28+
// [START require]
29+
// Provide projectId and authentication to gcloud
30+
var gcloud = require('gcloud')({
31+
projectId: projectId,
32+
keyFilename: keyFilename
33+
});
34+
35+
// Get a reference to the pubsub component
36+
var pubsub = gcloud.pubsub();
37+
// [END auth]
38+
39+
// [START create_topic]
40+
function createTopic(callback) {
41+
var topicName = 'messageCenter';
42+
43+
var topic = pubsub.topic(topicName);
44+
45+
// Get the topic if it exists. Create it if it does not exist.
46+
topic.get({
47+
autoCreate: true
48+
}, callback);
49+
}
50+
// [END create_topic]
51+
52+
// [START publish]
53+
function publish(callback) {
54+
var topicName = 'messageCenter';
55+
56+
// Grab a reference to an existing topic
57+
var topic = pubsub.topic(topicName);
58+
59+
// Publish a message to the topic
60+
topic.publish({
61+
data: 'Hello, world!'
62+
}, callback);
63+
}
64+
// [END publish]
65+
66+
// [START list_topics]
67+
function getAllTopics(callback) {
68+
// Grab paginated topics
69+
pubsub.getTopics(function (err, topics, nextQuery) {
70+
// Quit on error
71+
if (err) {
72+
return callback(err);
73+
}
74+
// There is another page of topics
75+
if (nextQuery) {
76+
// Grab the remaining pages of topics recursively
77+
return getAllTopics(function (err, _topics) {
78+
if (_topics) {
79+
topics = topics.concat(_topics);
80+
}
81+
callback(err, topics);
82+
});
83+
}
84+
// Last page of topics
85+
return callback(err, topics);
86+
});
87+
}
88+
// [END list_topics]
89+
90+
// [START get_all_subscriptions]
91+
function getAllSubscriptions(callback) {
92+
// Grab paginated subscriptions
93+
pubsub.getSubscriptions(function (err, subscriptions, nextQuery) {
94+
// Quit on error
95+
if (err) {
96+
return callback(err);
97+
}
98+
// There is another page of subscriptions
99+
if (nextQuery) {
100+
// Grab the remaining pages of subscriptions recursively
101+
return getAllSubscriptions(function (err, _subscriptions) {
102+
if (_subscriptions) {
103+
subscriptions = subscriptions.concat(_subscriptions);
104+
}
105+
callback(err, subscriptions);
106+
});
107+
}
108+
// Last page of subscriptions
109+
return callback(err, subscriptions);
110+
});
111+
}
112+
// [END get_all_subscriptions]
113+
114+
// [START create_subscription]
115+
function subscribe(callback) {
116+
var topicName = 'messageCenter';
117+
var subscriptionName = 'newMessages';
118+
119+
var options = {
120+
reuseExisting: true
121+
};
122+
pubsub.subscribe(topicName, subscriptionName, options, callback);
123+
}
124+
// [END create_subscription]
125+
126+
// [START pull_messages]
127+
function pullMessages(callback) {
128+
// Create a topic
129+
createTopic(function (err) {
130+
if (err) {
131+
return callback(err);
132+
}
133+
// Create a subscription to the topic
134+
subscribe(function (err, subscription) {
135+
if (err) {
136+
return callback(err);
137+
}
138+
var options = {
139+
// Limit the amount of messages pulled.
140+
maxResults: 100,
141+
// If set, the system will respond immediately. Otherwise, wait until
142+
// new messages are available. Returns if timeout is reached.
143+
returnImmediately: false
144+
};
145+
// Pull any messages on the subscription
146+
subscription.pull(options, function (err, messages) {
147+
if (err) {
148+
return callback(err);
149+
}
150+
151+
// Do something with messages here?
152+
153+
// Acknowledge messages
154+
subscription.ack(messages.map(function (message) {
155+
return message.ackId;
156+
}), function (err) {
157+
if (err) {
158+
return callback(err);
159+
}
160+
callback(null, messages);
161+
});
162+
});
163+
});
164+
});
165+
}
166+
// [END pull_messages]
167+
168+
exports.createTopic = createTopic;
169+
exports.publish = publish;
170+
exports.getAllTopics = getAllTopics;
171+
exports.getAllSubscriptions = getAllSubscriptions;
172+
exports.pullMessages = pullMessages;
173+
exports.subscribe = subscribe;
174+
exports.pubsub = pubsub;
175+
176+
if (module === require.main) {
177+
var _subscription;
178+
var _topic;
179+
async.waterfall([
180+
function (cb) {
181+
console.log('create topic...');
182+
createTopic(cb);
183+
},
184+
function (topic, apiResponse, cb) {
185+
_topic = topic;
186+
console.log('created topic');
187+
console.log('create subscription...');
188+
subscribe(cb);
189+
},
190+
function (subscription, apiResponse, cb) {
191+
_subscription = subscription;
192+
console.log('created subscription');
193+
console.log('list all subscriptions...');
194+
getAllSubscriptions(cb);
195+
},
196+
function (subscriptions, cb) {
197+
console.log('got all subscriptions');
198+
console.log('publishing a message...');
199+
publish(cb);
200+
},
201+
function (messageIds, apiResponse, cb) {
202+
console.log('published message');
203+
console.log('pulling messages...');
204+
pullMessages(cb);
205+
},
206+
function (messages, cb) {
207+
console.log('got messages', messages.map(function (message) {
208+
return message.data;
209+
}));
210+
console.log('deleting subscription...');
211+
_subscription.delete(cb);
212+
},
213+
function (apiResponse, cb) {
214+
console.log('deleted subscription');
215+
console.log('deleting topic...');
216+
_topic.delete(cb);
217+
}
218+
], function (err) {
219+
if (err) {
220+
console.error(err);
221+
} else {
222+
console.log('deleted topic');
223+
}
224+
});
225+
}

0 commit comments

Comments
 (0)