Skip to content

Commit 18d2365

Browse files
committed
switch break statements are important and easy to forget.. mutate subscriptions broken but not far from fixed
1 parent f1f1658 commit 18d2365

File tree

7 files changed

+163
-83
lines changed

7 files changed

+163
-83
lines changed

dst/client.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ export class Client {
8484
});
8585
return;
8686
}
87+
else if (json.response.type === "sub-inst") {
88+
this.walkSubscribers(json.response.topic, undefined, (_cb) => {
89+
_cb(json.response.id, undefined, true);
90+
});
91+
return;
92+
}
8793
const { resolve, reject } = this.responseResolvers.get(json.id);
8894
if (resolve) {
8995
//if we did, json.response is our answer and we stop listening
@@ -124,15 +130,18 @@ export class Client {
124130
return res;
125131
}
126132
subscribe(topic, cb) {
127-
let cfg = topic;
133+
let cfg = undefined;
128134
if (typeof (topic) === "string") {
129-
// cfg.onlyDeliverDeltas = false;
135+
cfg = {
136+
topic
137+
};
130138
}
131139
else {
132140
cfg = topic;
133141
topic = cfg.topic;
134142
}
135143
this.addSubscriber(topic, cfg.id, cb);
144+
console.log("sending sub to server", cfg);
136145
return this.sendMessage("sub", cfg);
137146
}
138147
unsubscribe(topic) {

dst/server.js

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { createServer } from "http";
22
import serveHandler from "serve-handler";
33
import { server as WebSocketServer } from "websocket";
44
import { watchFile } from "fs";
5-
import { resolve as pathResolve } from "path";
65
function originIsAllowed(origin) {
76
return true;
87
}
@@ -16,8 +15,6 @@ async function main() {
1615
return mod.auth;
1716
}
1817
let authFunc = await loadAuthFunc();
19-
const currentDirectory = pathResolve('.');
20-
console.log("Current dir (aka . ) is", currentDirectory);
2118
watchFile("./dst/auth.js", {
2219
persistent: true,
2320
interval: 4000
@@ -31,7 +28,7 @@ async function main() {
3128
}
3229
});
3330
httpServer = createServer((req, res) => {
34-
console.log(req.url);
31+
// console.log(req.url);
3532
if (req.url.startsWith("/api")) {
3633
res.writeHead(200, "success", {
3734
"Content-Type": "application/json"
@@ -83,19 +80,26 @@ async function main() {
8380
const storage = subStorageGetOrCreate(topic);
8481
if (id !== undefined) {
8582
idSubsListGetOrCreate(storage, id).add(ws);
86-
console.log("Added sub", topic, id, ws.remoteAddress);
83+
console.log(`[sub] ${topic}:${id} -> client`);
8784
}
8885
else {
8986
storage.topicSubscribers.add(ws);
90-
console.log("Added sub", topic, ws.remoteAddress);
87+
console.log(`[sub] ${topic} -> client`);
9188
}
9289
}
9390
function walkSubscribers(topic, id = undefined, cb) {
9491
const storage = subStorageGetOrCreate(topic);
95-
let list = id === undefined ?
96-
storage.topicSubscribers :
97-
idSubsListGetOrCreate(storage, id);
98-
for (const ws of list) {
92+
//call ID specific listeners first if present
93+
if (id !== undefined) {
94+
const idSubs = storage.idSubScribers.get(id);
95+
if (idSubs) {
96+
for (const ws of idSubs) {
97+
cb(ws);
98+
}
99+
}
100+
}
101+
//call topic subscribers always if present
102+
for (const ws of storage.topicSubscribers) {
99103
cb(ws);
100104
}
101105
}
@@ -137,23 +141,25 @@ async function main() {
137141
res.error = `Invalid auth to create schema`;
138142
}
139143
else {
140-
console.log("schema-set", topic);
144+
console.log(`[schema] created "${topic}"`);
141145
schemas.set(topic, {
142146
shape,
143147
instances: new Map()
144148
});
145149
}
146150
}
147151
break;
148-
case "schema-get": {
149-
let { topic } = req.msg;
150-
const schema = schemas.get(topic);
151-
if (!schema) {
152-
res.error = "no schema for topic";
153-
break;
152+
case "schema-get":
153+
{
154+
let { topic } = req.msg;
155+
const schema = schemas.get(topic);
156+
if (!schema) {
157+
res.error = "no schema for topic";
158+
break;
159+
}
160+
res.response.shape = schema.shape;
154161
}
155-
res.response.shape = schema.shape;
156-
}
162+
break;
157163
case "instance":
158164
{
159165
const topic = req.msg.topic;
@@ -165,7 +171,24 @@ async function main() {
165171
const instanceId = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString();
166172
res.response.id = instanceId;
167173
storage.instances.set(instanceId, {});
168-
// console.log("instance", instanceId);
174+
console.log("instance req", req);
175+
/**create a message to send to topic subscribers
176+
* to let them know a new instance exists
177+
*/
178+
const subInstRes = {
179+
response: {
180+
type: "sub-inst",
181+
topic,
182+
id: instanceId
183+
},
184+
id: -1
185+
};
186+
const subInstResStr = JSON.stringify(subInstRes);
187+
console.log(`[schema] instanced "${topic}:${instanceId}"`);
188+
walkSubscribers(topic, undefined, (ws) => {
189+
console.log("Notifying", ws.remoteAddress, "of instance", instanceId);
190+
ws.send(subInstResStr);
191+
});
169192
}
170193
break;
171194
case "mut":
@@ -210,11 +233,9 @@ async function main() {
210233
},
211234
id: -1
212235
};
213-
// console.log("received mutate", change);
214236
const subResStr = JSON.stringify(subRes);
215237
walkSubscribers(topic, id, (ws) => {
216238
ws.send(subResStr);
217-
// console.log("Send mut to", subResStr, ws.remoteAddress);
218239
});
219240
}
220241
break;
@@ -228,17 +249,19 @@ async function main() {
228249
res.error = "unsub is not impl yet";
229250
break;
230251
case "list":
231-
const topic = req.msg.topic;
232-
const storage = schemas.get(topic);
233-
if (!storage) {
234-
res.error = `schema for topic was not found`;
235-
break;
252+
{
253+
const topic = req.msg.topic;
254+
const storage = schemas.get(topic);
255+
if (!storage) {
256+
res.error = `schema for topic was not found`;
257+
break;
258+
}
259+
const list = {};
260+
storage.instances.forEach((v, k) => {
261+
list[k] = v;
262+
});
263+
res.response.list = list;
236264
}
237-
const list = {};
238-
storage.instances.forEach((v, k) => {
239-
list[k] = v;
240-
});
241-
res.response.list = list;
242265
break;
243266
}
244267
let str = JSON.stringify(res);

dst/test.js

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ async function main() {
2424
//wait for connection
2525
await client.connect();
2626
//wait for authentication
27-
const a = await client.authenticate({ apiKey }); //not impl yet
28-
console.log("Authenticated", a);
27+
await client.authenticate({ apiKey }); //not impl yet
2928
//create a storage for players, will be owned by our client
3029
if (!await client.hasSchema("players")) {
3130
await client.createSchema("players", {
@@ -45,22 +44,31 @@ async function main() {
4544
// //upload our initial player data
4645
await client.mutate("players", localId, {
4746
name: prompt("Enter player name", "testbot"),
48-
x: 1,
49-
y: 2
47+
x: 0.5,
48+
y: 0.5
5049
});
5150
const players = new Map();
5251
const insts = await client.listInstances("players");
5352
const { list } = insts.response;
53+
function addPlayer(id, data) {
54+
players.set(id, data);
55+
}
5456
for (const id in list) {
55-
console.log("Player found by id", id);
5657
const p = list[id];
57-
players.set(id, p);
58-
client.subscribe({ topic: "players", id }, (pid, change) => {
59-
const original = players.get(id);
60-
Object.assign(original, change);
61-
// console.log("Player", id, "updated", change);
62-
});
58+
addPlayer(id, p);
59+
console.log("Player identified in list", id);
6360
}
61+
await client.subscribe("players", (pid, change, isNewInstance) => {
62+
console.log("sub msg");
63+
if (isNewInstance) {
64+
console.log("Player instanced", pid);
65+
addPlayer(pid, { x: 0.5, y: 0.5, name: "" });
66+
}
67+
else {
68+
const original = players.get(pid);
69+
Object.assign(original, change);
70+
}
71+
});
6472
const mouseMoveDebounce = {
6573
timeLast: 0,
6674
timeWait: 50
@@ -72,7 +80,6 @@ async function main() {
7280
x: x / window.innerWidth,
7381
y: y / window.innerHeight
7482
});
75-
// console.log("Sending mutate");
7683
};
7784
window.addEventListener("mousemove", (evt) => {
7885
handlePointerMove(evt.clientX, evt.clientY);

src/auth.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { MsgReq } from "./common";
2-
31

2+
import type { MsgReq } from "./common";
43

54
export async function auth (msg: MsgReq<any>): Promise<string> {
65
return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString();

src/client.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export interface ResolveReject<T> {
1414
}
1515

1616
export interface SubCb<T> {
17-
(id: string, change: T): void;
17+
(id: string, change?: T, isNewInstance?: true): void;
1818
}
1919
export type TopicId = string;
2020
export type InstanceId = string;
@@ -134,6 +134,15 @@ export class Client {
134134
}
135135
);
136136
return;
137+
} else if (json.response.type === "sub-inst") {
138+
this.walkSubscribers(
139+
json.response.topic,
140+
undefined,
141+
(_cb)=>{
142+
_cb(json.response.id, undefined, true);
143+
}
144+
);
145+
return;
137146
}
138147

139148
const {resolve, reject} = this.responseResolvers.get(json.id);
@@ -179,14 +188,17 @@ export class Client {
179188
return res;
180189
}
181190
subscribe<InstanceType> (topic: string|SubConfig, cb: SubCb<InstanceType>) {
182-
let cfg = topic as SubConfig;
191+
let cfg = undefined;
183192
if (typeof(topic) === "string") {
184-
// cfg.onlyDeliverDeltas = false;
193+
cfg = {
194+
topic
195+
};
185196
} else {
186197
cfg = topic;
187198
topic = cfg.topic as string;
188199
}
189200
this.addSubscriber(topic, cfg.id, cb as any);
201+
console.log("sending sub to server", cfg);
190202
return this.sendMessage("sub", cfg);
191203
}
192204
unsubscribe (topic: string) {

0 commit comments

Comments
 (0)