Skip to content

Commit e316d1d

Browse files
committed
cleans up hanging eventlisteners
1 parent 4b6f114 commit e316d1d

File tree

2 files changed

+2951
-11
lines changed

2 files changed

+2951
-11
lines changed

lib/stream-to-async-iterator.js

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@ export const states = {
1111
errored: Symbol('errored'),
1212
};
1313

14+
/*
15+
* A contract for a promise that requires a clean up
16+
* function be called after the promise finishes.
17+
*/
18+
type PromiseWithCleanUp<T> = {
19+
promise: Promise<T>,
20+
cleanup: () => void,
21+
}
22+
1423
/**
1524
* @typedef {Object} StreamAsyncToIterator~Options
1625
* @property {number} [size] - the size of each read from the stream for each iteration
@@ -106,9 +115,22 @@ export default class StreamAsyncToIterator {
106115
*/
107116
async next(): Promise<Iteration> {
108117
if (this._state === states.notReadable) {
118+
const read = this._untilReadable();
119+
const end = this._untilEnd();
120+
109121
//need to wait until the stream is readable or ended
110-
await Promise.race([this._untilReadable(), this._untilEnd()]);
111-
return this.next();
122+
try {
123+
await Promise.race([read.promise, end.promise]);
124+
return this.next();
125+
}
126+
catch (e) {
127+
throw e
128+
}
129+
finally {
130+
//need to clean up any hanging event listeners
131+
read.cleanup()
132+
end.cleanup()
133+
}
112134
} else if (this._state === states.ended) {
113135
return {done: true, value: null};
114136
} else if (this._state === states.errored) {
@@ -133,34 +155,58 @@ export default class StreamAsyncToIterator {
133155
* @private
134156
* @returns {Promise}
135157
*/
136-
_untilReadable(): Promise<void> {
137-
return new Promise((resolve, reject) => {
138-
const handleReadable = () => {
158+
_untilReadable(): PromiseWithCleanUp<void> {
159+
//let is used here instead of const because the exact reference is
160+
//required to remove it, this is why it is not a curried function that
161+
//accepts resolve & reject as parameters.
162+
let eventListener = null;
163+
164+
const promise = new Promise((resolve, reject) => {
165+
eventListener = () => {
139166
this._state = states.readable;
140167
this._rejections.delete(reject);
141168
resolve();
142169
};
143170

144-
this._stream.once('readable', handleReadable);
171+
//on is used here instead of once, because
172+
//the listener is remove afterwards anyways.
173+
this._stream.on('readable', eventListener);
145174
this._rejections.add(reject);
146175
});
176+
177+
const cleanup = () => {
178+
if (eventListener == null) return;
179+
this._stream.removeListener('readable', eventListener);
180+
};
181+
182+
return { cleanup, promise }
147183
}
148184

149185
/**
150186
* Waits until the stream is ended. Rejects if the stream errored out.
151187
* @private
152188
* @returns {Promise}
153189
*/
154-
_untilEnd(): Promise<void> {
155-
return new Promise((resolve, reject) => {
156-
const handleEnd = () => {
190+
_untilEnd(): PromiseWithCleanUp<void> {
191+
let eventListener = null;
192+
193+
const promise = new Promise((resolve, reject) => {
194+
eventListener = () => {
157195
this._state = states.ended;
158196
this._rejections.delete(reject);
159197
resolve();
160198
};
161-
this._stream.once('end', handleEnd);
199+
200+
this._stream.on('end', eventListener);
162201
this._rejections.add(reject);
163-
})
202+
});
203+
204+
const cleanup = () => {
205+
if (eventListener == null) return;
206+
this._stream.removeListener('end', eventListener);
207+
};
208+
209+
return { cleanup, promise }
164210
}
165211
}
166212

0 commit comments

Comments
 (0)