Skip to content

Commit ad13b1e

Browse files
committed
support rabbitmq
1 parent b46e575 commit ad13b1e

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
"update": "ncu -u -a"
2929
},
3030
"dependencies": {
31+
"amqplib": "^0.5.1",
32+
"amqplib-rpc": "^2.0.4",
3133
"blueimp-md5": "^2.7.0",
3234
"body-parser": "~1.17.1",
35+
"bunnymq": "^2.2.2",
3336
"compression": "^1.6.2",
3437
"cookie-parser": "~1.4.3",
3538
"cors": "^2.8.1",
@@ -53,7 +56,8 @@
5356
"request-promise": "^4.1.1",
5457
"sequelize": "^3.30.2",
5558
"serve-favicon": "~2.4.1",
56-
"sqlite3": "^3.1.8"
59+
"sqlite3": "^3.1.8",
60+
"thrift": "^0.10.0"
5761
},
5862
"devDependencies": {
5963
"browser-sync": "^2.18.8",

server/utils/rabbitmq.js

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* 消息队列,简单实现了消息订阅、发布和RPC调用
3+
* 也可参考 https://github.com/tjmehta/amqplib-rpc 实现方式
4+
*/
5+
6+
var bunnymq = require('bunnymq')
7+
8+
var config = {
9+
host: 'amqp://localhost', // connect url
10+
prefetch: 5, // number of fetched messages at once on the channel
11+
requeue: true, // requeue put back message into the broker if consumer crashes/trigger exception
12+
timeout: 1000, // time between two reconnect (ms)
13+
rpcTimeout: 1000 // default timeout for RPC calls. If set to '0' there will be none.
14+
}
15+
16+
var conn = bunnymq(config)
17+
var producer = conn.producer
18+
var consumer = conn.consumer
19+
20+
/**
21+
* Producer (publisher), can send messages to a named queue.
22+
*/
23+
exports.publish = function (queueName, content) {
24+
producer.produce(queueName, content)
25+
}
26+
27+
/**
28+
* Consumer (subscriber), can handle messages from a named queue.
29+
*/
30+
exports.subscribe = function (queueName, callback) {
31+
consumer.consume(queueName, callback)
32+
}
33+
34+
/**
35+
* RPC invoking
36+
* @param {String} queueName 队列名
37+
* @param {Object} content 传递的参数
38+
* @param {Function} callback 回调函数,参数为返回值
39+
*/
40+
exports.rpcCall = function (queueName, content, callback) {
41+
producer.produce(queueName, content, { rpc: true }).then(callback)
42+
}
43+
44+
/**
45+
* RPC provider
46+
* @param {String} queueName 队列名
47+
* @param {Function} handler 处理函数,其返回值可以是普通数据类型或Promise对象
48+
*/
49+
exports.rpcReceive = function (queueName, handler) {
50+
consumer.consume(queueName, handler)
51+
}

0 commit comments

Comments
 (0)