-
- Notifications
You must be signed in to change notification settings - Fork 33.7k
events: add EventEmitter.on to async iterate over events #27994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fb4052a 0a1b52d 31bb823 c795045 7344bf7 c8b16cf 4542f50 10821a2 b71f016 0c38799 0e9b7fa f1babd5 2e9ffee 346fa56 0e518be c7cda01 35939a5 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -29,12 +29,16 @@ const { | |
| ObjectCreate, | ||
| ObjectDefineProperty, | ||
| ObjectGetPrototypeOf, | ||
| ObjectSetPrototypeOf, | ||
| ObjectKeys, | ||
| Promise, | ||
mcollina marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| PromiseReject, | ||
| PromiseResolve, | ||
| ReflectApply, | ||
| ReflectOwnKeys, | ||
| Symbol, | ||
| SymbolFor, | ||
| SymbolAsyncIterator | ||
| } = primordials; | ||
| const kRejection = SymbolFor('nodejs.rejection'); | ||
| | ||
| | @@ -62,6 +66,7 @@ function EventEmitter(opts) { | |
| } | ||
| module.exports = EventEmitter; | ||
| module.exports.once = once; | ||
| module.exports.on = on; | ||
| | ||
| // Backwards-compat with node 0.10.x | ||
| EventEmitter.EventEmitter = EventEmitter; | ||
| | @@ -657,3 +662,102 @@ function once(emitter, name) { | |
| emitter.once(name, eventListener); | ||
| }); | ||
| } | ||
| | ||
| const AsyncIteratorPrototype = ObjectGetPrototypeOf( | ||
| ObjectGetPrototypeOf(async function* () {}).prototype); | ||
| | ||
| function createIterResult(value, done) { | ||
| return { value, done }; | ||
| } | ||
| | ||
| function on(emitter, event) { | ||
| const unconsumedEvents = []; | ||
| const unconsumedPromises = []; | ||
| let error = null; | ||
| let finished = false; | ||
| | ||
| const iterator = ObjectSetPrototypeOf({ | ||
| next() { | ||
| // First, we consume all unread events | ||
| const value = unconsumedEvents.shift(); | ||
| if (value) { | ||
| return PromiseResolve(createIterResult(value, false)); | ||
| } | ||
| | ||
| // Then we error, if an error happened | ||
| // This happens one time if at all, because after 'error' | ||
| // we stop listening | ||
| if (error) { | ||
| const p = PromiseReject(error); | ||
| // Only the first element errors | ||
| error = null; | ||
| return p; | ||
| } | ||
| | ||
| // If the iterator is finished, resolve to done | ||
| if (finished) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stupid question: how does | ||
| return PromiseResolve(createIterResult(undefined, true)); | ||
| } | ||
| | ||
| // Wait until an event happens | ||
| return new Promise(function(resolve, reject) { | ||
| unconsumedPromises.push({ resolve, reject }); | ||
| }); | ||
| }, | ||
| | ||
| return() { | ||
| emitter.removeListener(event, eventHandler); | ||
| ||
| emitter.removeListener('error', errorHandler); | ||
| finished = true; | ||
| | ||
| for (const promise of unconsumedPromises) { | ||
mcollina marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| promise.resolve(createIterResult(undefined, true)); | ||
| } | ||
| | ||
| return PromiseResolve(createIterResult(undefined, true)); | ||
| }, | ||
| | ||
| throw(err) { | ||
| if (!err || !(err instanceof Error)) { | ||
| throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator', | ||
| 'Error', err); | ||
| } | ||
| error = err; | ||
| emitter.removeListener(event, eventHandler); | ||
| emitter.removeListener('error', errorHandler); | ||
| }, | ||
| | ||
| [SymbolAsyncIterator]() { | ||
| return this; | ||
| } | ||
| }, AsyncIteratorPrototype); | ||
mcollina marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| | ||
| emitter.on(event, eventHandler); | ||
| emitter.on('error', errorHandler); | ||
| | ||
| return iterator; | ||
| | ||
| function eventHandler(...args) { | ||
| const promise = unconsumedPromises.shift(); | ||
| if (promise) { | ||
| promise.resolve(createIterResult(args, false)); | ||
| } else { | ||
| unconsumedEvents.push(args); | ||
| } | ||
| } | ||
| | ||
| function errorHandler(err) { | ||
| finished = true; | ||
| | ||
| const toError = unconsumedPromises.shift(); | ||
| | ||
| if (toError) { | ||
| toError.reject(err); | ||
| } else { | ||
| // The next time we call next() | ||
| error = err; | ||
| } | ||
| | ||
| iterator.return(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,223 @@ | ||
| 'use strict'; | ||
| | ||
| const common = require('../common'); | ||
| const assert = require('assert'); | ||
| const { on, EventEmitter } = require('events'); | ||
| | ||
| async function basic() { | ||
| const ee = new EventEmitter(); | ||
| process.nextTick(() => { | ||
| ee.emit('foo', 'bar'); | ||
| // 'bar' is a spurious event, we are testing | ||
| // that it does not show up in the iterable | ||
| ee.emit('bar', 24); | ||
| ee.emit('foo', 42); | ||
mcollina marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| }); | ||
| | ||
| const iterable = on(ee, 'foo'); | ||
| | ||
| const expected = [['bar'], [42]]; | ||
| | ||
| for await (const event of iterable) { | ||
| const current = expected.shift(); | ||
| | ||
| assert.deepStrictEqual(current, event); | ||
| | ||
| if (expected.length === 0) { | ||
| break; | ||
| } | ||
| } | ||
| assert.strictEqual(ee.listenerCount('foo'), 0); | ||
| assert.strictEqual(ee.listenerCount('error'), 0); | ||
| } | ||
| | ||
| async function error() { | ||
| const ee = new EventEmitter(); | ||
| const _err = new Error('kaboom'); | ||
| process.nextTick(() => { | ||
| ee.emit('error', _err); | ||
| }); | ||
| | ||
| const iterable = on(ee, 'foo'); | ||
| let looped = false; | ||
| let thrown = false; | ||
| | ||
| try { | ||
| // eslint-disable-next-line no-unused-vars | ||
| for await (const event of iterable) { | ||
| looped = true; | ||
| } | ||
| } catch (err) { | ||
| thrown = true; | ||
| assert.strictEqual(err, _err); | ||
| } | ||
| assert.strictEqual(thrown, true); | ||
| assert.strictEqual(looped, false); | ||
| } | ||
| | ||
| async function errorDelayed() { | ||
| const ee = new EventEmitter(); | ||
| const _err = new Error('kaboom'); | ||
| process.nextTick(() => { | ||
| ee.emit('foo', 42); | ||
| ee.emit('error', _err); | ||
| }); | ||
| | ||
| const iterable = on(ee, 'foo'); | ||
| const expected = [[42]]; | ||
| let thrown = false; | ||
| | ||
| try { | ||
| for await (const event of iterable) { | ||
| const current = expected.shift(); | ||
| assert.deepStrictEqual(current, event); | ||
| } | ||
| } catch (err) { | ||
| thrown = true; | ||
| assert.strictEqual(err, _err); | ||
| } | ||
| assert.strictEqual(thrown, true); | ||
| assert.strictEqual(ee.listenerCount('foo'), 0); | ||
| assert.strictEqual(ee.listenerCount('error'), 0); | ||
| } | ||
| | ||
| async function throwInLoop() { | ||
| const ee = new EventEmitter(); | ||
| const _err = new Error('kaboom'); | ||
| | ||
| process.nextTick(() => { | ||
| ee.emit('foo', 42); | ||
| }); | ||
| | ||
| try { | ||
| for await (const event of on(ee, 'foo')) { | ||
| assert.deepStrictEqual(event, [42]); | ||
| throw _err; | ||
| } | ||
| } catch (err) { | ||
| assert.strictEqual(err, _err); | ||
| } | ||
| | ||
| assert.strictEqual(ee.listenerCount('foo'), 0); | ||
| assert.strictEqual(ee.listenerCount('error'), 0); | ||
| } | ||
| | ||
| async function next() { | ||
| const ee = new EventEmitter(); | ||
| const iterable = on(ee, 'foo'); | ||
| | ||
| process.nextTick(function() { | ||
| ee.emit('foo', 'bar'); | ||
| ee.emit('foo', 42); | ||
| iterable.return(); | ||
| }); | ||
| | ||
| const results = await Promise.all([ | ||
| iterable.next(), | ||
| iterable.next(), | ||
| iterable.next() | ||
mcollina marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| ]); | ||
| | ||
| assert.deepStrictEqual(results, [{ | ||
| value: ['bar'], | ||
| done: false | ||
| }, { | ||
| value: [42], | ||
| done: false | ||
| }, { | ||
| value: undefined, | ||
| done: true | ||
| }]); | ||
| | ||
| assert.deepStrictEqual(await iterable.next(), { | ||
| value: undefined, | ||
| done: true | ||
| }); | ||
| } | ||
| | ||
| async function nextError() { | ||
| const ee = new EventEmitter(); | ||
| const iterable = on(ee, 'foo'); | ||
| const _err = new Error('kaboom'); | ||
| process.nextTick(function() { | ||
| ee.emit('error', _err); | ||
| }); | ||
| const results = await Promise.allSettled([ | ||
| iterable.next(), | ||
| iterable.next(), | ||
| iterable.next() | ||
| ]); | ||
| assert.deepStrictEqual(results, [{ | ||
| status: 'rejected', | ||
| reason: _err | ||
| }, { | ||
| status: 'fulfilled', | ||
| value: { | ||
| value: undefined, | ||
| done: true | ||
| } | ||
| }, { | ||
| status: 'fulfilled', | ||
| value: { | ||
| value: undefined, | ||
| done: true | ||
| } | ||
| }]); | ||
mcollina marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| assert.strictEqual(ee.listeners('error').length, 0); | ||
| } | ||
| | ||
| async function iterableThrow() { | ||
| const ee = new EventEmitter(); | ||
| const iterable = on(ee, 'foo'); | ||
| | ||
| process.nextTick(() => { | ||
| ee.emit('foo', 'bar'); | ||
| ee.emit('foo', 42); // lost in the queue | ||
| iterable.throw(_err); | ||
| }); | ||
| | ||
| const _err = new Error('kaboom'); | ||
| let thrown = false; | ||
| | ||
| assert.throws(() => { | ||
| // No argument | ||
| iterable.throw(); | ||
| }, { | ||
| message: 'The "EventEmitter.AsyncIterator" property must be' + | ||
| ' an instance of Error. Received undefined', | ||
| name: 'TypeError' | ||
| }); | ||
| | ||
| const expected = [['bar'], [42]]; | ||
| | ||
| try { | ||
| for await (const event of iterable) { | ||
| assert.deepStrictEqual(event, expected.shift()); | ||
| } | ||
| } catch (err) { | ||
| thrown = true; | ||
| assert.strictEqual(err, _err); | ||
| } | ||
| assert.strictEqual(thrown, true); | ||
| assert.strictEqual(expected.length, 0); | ||
| assert.strictEqual(ee.listenerCount('foo'), 0); | ||
| assert.strictEqual(ee.listenerCount('error'), 0); | ||
| } | ||
| | ||
| async function run() { | ||
mcollina marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| const funcs = [ | ||
| basic, | ||
| error, | ||
| errorDelayed, | ||
| throwInLoop, | ||
| next, | ||
| nextError, | ||
| iterableThrow | ||
| ]; | ||
| | ||
| for (const fn of funcs) { | ||
| await fn(); | ||
| } | ||
| } | ||
| | ||
| run().then(common.mustCall()); | ||
Uh oh!
There was an error while loading. Please reload this page.