Skip to content

Commit 8ae2481

Browse files
committed
added support for the scale with meteor cluster
added supporting functions and hooks to meteor cluster to get simply plugged in
1 parent b3da04f commit 8ae2481

File tree

5 files changed

+109
-56
lines changed

5 files changed

+109
-56
lines changed

laika/tests/scaling_support.js

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
var assert = require('assert');
2+
require('./common');
3+
4+
suite('Scaling Support', function() {
5+
test('trigger a client using .emitToSubscriptions()', function(done, server, client) {
6+
server.evalSync(createServerStream, 'hello');
7+
client.evalSync(createClientStream, 'hello');
8+
9+
client.evalSync(function() {
10+
helloStream.on('evt', function(data) {
11+
emit('evt', data, this);
12+
});
13+
emit('return');
14+
});
15+
16+
client.on('evt', function(data, context) {
17+
assert.deepEqual(data, {a: 10});
18+
assert.deepEqual(context, {subscriptionId: 'subscriptionId', userId: 'userId'});
19+
done();
20+
});
21+
22+
server.evalSync(function() {
23+
helloStream.emitToSubscriptions(['evt', {a: 10}], 'subscriptionId', 'userId');
24+
});
25+
});
26+
27+
test('listen to all events with firehose', function(done, server, client) {
28+
server.evalSync(createServerStream, 'hello');
29+
client.evalSync(createClientStream, 'hello');
30+
31+
client.evalSync(laika.actions.createUser, {username: 'arunoda', password: 'password'});
32+
var user = client.evalSync(laika.actions.loggedInUser);
33+
34+
server.evalSync(function() {
35+
helloStream.firehose = function(args, subscriptionId, userId) {
36+
emit('firehose', args, subscriptionId, userId);
37+
};
38+
emit('return');
39+
});
40+
41+
server.on('firehose', function(args, subscriptionId, userId) {
42+
assert.deepEqual(args, ['evt', {a: 100}]);
43+
assert.equal(userId, user._id);
44+
assert.ok(subscriptionId);
45+
done();
46+
});
47+
48+
client.evalSync(function() {
49+
helloStream.emit('evt', {a: 100});
50+
emit('return');
51+
});
52+
});
53+
});

lib/server.js

Lines changed: 11 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ var Fibers = Npm.require('fibers');
44

55
Meteor.Stream = function Stream(name) {
66
EV.call(this);
7+
78
var self = this;
89
var streamName = 'stream-' + name;
910
var allowFunction;
@@ -18,22 +19,18 @@ Meteor.Stream = function Stream(name) {
1819

1920
self._emit = self.emit;
2021
self.emit = function emit() {
21-
var args = [];
22-
for(var key in arguments) {
23-
args.push(arguments[key]);
24-
}
25-
emitToSubscriptions(args, null, null);
22+
self.emitToSubscriptions(arguments, null, null);
2623
};
2724

2825
self.permissions = new Meteor.Stream.Permission(Meteor.Collection.insecure, true);
2926

30-
self.addFilter = function(callback) {
27+
self.addFilter = function addFilter(callback) {
3128
filters.push(callback);
3229
};
3330

34-
function emitToSubscriptions(args, subscriptionId, userId) {
31+
self.emitToSubscriptions = function emitToSubscriptions(args, subscriptionId, userId) {
3532
events.emit('item', {args: args, userId: userId, subscriptionId: subscriptionId});
36-
}
33+
};
3734

3835
Meteor.publish(streamName, function() {
3936
var subscriptionId = Random.id();
@@ -81,7 +78,11 @@ Meteor.Stream = function Stream(name) {
8178
if(methodContext.allowed) {
8279
//apply filters
8380
args = applyFilters(args, methodContext);
84-
emitToSubscriptions(args, subscriptionId, methodContext.userId);
81+
self.emitToSubscriptions(args, subscriptionId, methodContext.userId);
82+
//send to firehose if exists
83+
if(self.firehose) {
84+
self.firehose(args, subscriptionId, methodContext.userId);
85+
}
8586
}
8687
//need to send this to server always
8788
self._emit.apply(methodContext, args);
@@ -105,46 +106,4 @@ Meteor.Stream = function Stream(name) {
105106
}
106107
};
107108

108-
util.inherits(Meteor.Stream, EV);
109-
110-
Meteor.Stream.Permission = function (acceptAll, cacheAll) {
111-
112-
var options = {
113-
"read": {
114-
results: {}
115-
},
116-
"write": {
117-
results: {}
118-
}
119-
};
120-
121-
this.read = function(func, cache) {
122-
options['read']['func'] = func;
123-
options['read']['doCache'] = (cache === undefined)? cacheAll: cache;
124-
};
125-
126-
this.write = function(func, cache) {
127-
options['write']['func'] = func;
128-
options['write']['doCache'] = (cache === undefined)? cacheAll: cache;
129-
};
130-
131-
this.checkPermission = function(type, userId, eventName) {
132-
var namespace = userId + '-' + eventName;
133-
var result = options[type].results[namespace];
134-
135-
if(result === undefined) {
136-
var func = options[type].func;
137-
if(func) {
138-
result = func(userId, eventName);
139-
if(options[type].doCache) {
140-
options[type].results[namespace] = result;
141-
return result;
142-
}
143-
} else {
144-
return acceptAll;
145-
}
146-
} else {
147-
return result;
148-
}
149-
};
150-
}
109+
util.inherits(Meteor.Stream, EV);

lib/stream_permission.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
Meteor.Stream.Permission = function (acceptAll, cacheAll) {
2+
var options = {
3+
"read": {
4+
results: {}
5+
},
6+
"write": {
7+
results: {}
8+
}
9+
};
10+
11+
this.read = function(func, cache) {
12+
options['read']['func'] = func;
13+
options['read']['doCache'] = (cache === undefined)? cacheAll: cache;
14+
};
15+
16+
this.write = function(func, cache) {
17+
options['write']['func'] = func;
18+
options['write']['doCache'] = (cache === undefined)? cacheAll: cache;
19+
};
20+
21+
this.checkPermission = function(type, userId, eventName) {
22+
var namespace = userId + '-' + eventName;
23+
var result = options[type].results[namespace];
24+
25+
if(result === undefined) {
26+
var func = options[type].func;
27+
if(func) {
28+
result = func(userId, eventName);
29+
if(options[type].doCache) {
30+
options[type].results[namespace] = result;
31+
return result;
32+
}
33+
} else {
34+
return acceptAll;
35+
}
36+
} else {
37+
return result;
38+
}
39+
};
40+
}

package.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ Package.describe({
33
});
44

55
Package.on_use(function (api, where) {
6-
api.add_files(['lib/ev.js', 'lib/server.js'], 'server');
6+
api.add_files(['lib/ev.js', 'lib/server.js', 'lib/stream_permission.js'], 'server');
77
api.add_files(['lib/ev.js', '/lib/client.js'], 'client');
88
});

test_cases.todo

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ Without Permissions:
44
✔ Client to Server @done (13-06-20 15:52)
55
✔ Client to Client @done (13-06-20 15:52)
66
✔ Client to Client with logged in @done (13-06-21 16:31)
7-
☐ Server Server
87
✔ emitting not fired to me in the client @done (13-06-22 17:52)
98

10-
119
Read Permissions:
1210

1311
✔ Server to Client with denied @done (13-06-21 14:55)
@@ -34,7 +32,7 @@ Filters:
3432
✔ simple filter adding an arg @done (13-06-23 09:04)
3533
✔ multiple filters @done (13-06-23 09:06)
3634
✔ modifying an arg @done (13-06-23 09:08)
37-
filters with userId and subscriptionId
35+
filters with userId and subscriptionId @done (13-06-26 11:10)
3836

3937
onDisconnect Handlers:
4038
✔ single onDisconnect handler @done (13-06-24 14:01)
@@ -50,6 +48,9 @@ EV:
5048
✔ once and emit @done (13-06-24 15:04)
5149
✔ emit with different contexts @done (13-06-24 16:06)
5250

51+
Scaling Support:
52+
✔ firehose @done (13-06-26 15:08)
53+
✔ emitToSubscribers @done (13-06-26 15:08)
5354

5455

5556

0 commit comments

Comments
 (0)