Skip to content

Commit 2084035

Browse files
ffMathygismya
andauthored
feat: Support for async subscriber callbacks (#132)
* Update event_hub.ts Add support for async subscriber callbacks while retaining backwards compatibility * Made changes to precommits. * Global fix for precommits. * Update source/event_hub.ts Co-authored-by: Lars Johansson <gismya@gmail.com> * Update test/event_hub.test.js Co-authored-by: Lars Johansson <gismya@gmail.com> * Update source/event_hub.ts Co-authored-by: Lars Johansson <gismya@gmail.com> * Update source/event_hub.ts Co-authored-by: Lars Johansson <gismya@gmail.com> * Update event_hub.ts Incorrectly copied the fix when suggesting line by line. * Removed changes. --------- Co-authored-by: Lars Johansson <gismya@gmail.com>
1 parent 753fc42 commit 2084035

File tree

3 files changed

+69
-10
lines changed

3 files changed

+69
-10
lines changed

source/event_hub.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -557,18 +557,25 @@ export class EventHub {
557557
*/
558558
private _handle(eventPayload: EventPayload) {
559559
this.logger.debug("Event received", eventPayload);
560-
560+
const promises: Promise<any>[] = [];
561561
for (const subscriber of this._subscribers) {
562562
// TODO: Parse event target and check that it matches subscriber.
563563

564564
// TODO: Support full expression format as used in Python.
565565
if (!this._IsSubscriberInterestedIn(subscriber, eventPayload)) {
566566
continue;
567567
}
568-
569-
let response = null;
570568
try {
571-
response = subscriber.callback(eventPayload);
569+
const responsePromise = Promise.resolve(
570+
subscriber.callback(eventPayload)
571+
);
572+
promises.push(responsePromise);
573+
responsePromise.then((response) => {
574+
// Publish reply if response isn't null or undefined.
575+
if (response != null) {
576+
this.publishReply(eventPayload, response, subscriber.metadata);
577+
}
578+
});
572579
} catch (error) {
573580
this.logger.error(
574581
"Error calling subscriber for event.",
@@ -577,12 +584,8 @@ export class EventHub {
577584
eventPayload
578585
);
579586
}
580-
581-
// Publish reply if response isn't null or undefined.
582-
if (response != null) {
583-
this.publishReply(eventPayload, response, subscriber.metadata);
584-
}
585587
}
588+
return promises;
586589
}
587590

588591
/**

source/session.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ export class Session {
611611
if (reason instanceof Error) {
612612
throw this.getErrorFromResponse({
613613
exception: "NetworkError",
614-
content: (reason.cause as string) || reason.message,
614+
content: (reason["cause"] as string) || reason.message,
615615
});
616616
}
617617
throw new Error("Unknown error");

test/event_hub.test.js

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,60 @@ describe("EventHub", () => {
152152

153153
expect(callback).not.toHaveBeenCalledWith(testEvent);
154154
});
155+
156+
test("should handle sync callback and return correct data", async () => {
157+
const callback = vi.fn(() => "someData");
158+
const testEvent = {
159+
topic: "ftrack.test",
160+
data: {},
161+
id: "eventId",
162+
source: { id: "sourceId" },
163+
};
164+
165+
const publishReplySpy = vi
166+
.spyOn(eventHub, "publishReply")
167+
.mockImplementation((_, data) => data);
168+
169+
eventHub.subscribe("topic=ftrack.test", callback);
170+
const promises = eventHub._handle(testEvent);
171+
await Promise.all(promises);
172+
expect(callback).toHaveBeenCalledWith(testEvent);
173+
expect(publishReplySpy).toHaveBeenCalledWith(
174+
expect.anything(),
175+
"someData",
176+
expect.anything()
177+
);
178+
publishReplySpy.mockRestore();
179+
});
180+
181+
test("should not handle async callback with a promise", async () => {
182+
const asyncCallback = vi.fn(async () => "someData");
183+
const testEvent = {
184+
topic: "ftrack.test",
185+
data: {},
186+
id: "eventId",
187+
source: { id: "sourceId" },
188+
};
189+
190+
const publishReplySpy = vi
191+
.spyOn(eventHub, "publishReply")
192+
.mockImplementation((_, data) => data);
193+
194+
eventHub.subscribe("topic=ftrack.test", asyncCallback);
195+
const promises = eventHub._handle(testEvent);
196+
await Promise.all(promises ?? []);
197+
expect(asyncCallback).toHaveBeenCalledWith(testEvent);
198+
expect(publishReplySpy).toHaveBeenCalled();
199+
expect(publishReplySpy).not.toHaveBeenCalledWith(
200+
expect.anything(),
201+
expect.any(Promise),
202+
expect.anything()
203+
);
204+
expect(publishReplySpy).toHaveBeenCalledWith(
205+
expect.anything(),
206+
"someData",
207+
expect.anything()
208+
);
209+
publishReplySpy.mockRestore();
210+
});
155211
});

0 commit comments

Comments
 (0)