2323
2424const {
2525 ArrayPrototypeJoin,
26- ArrayPrototypeShift,
26+ ArrayPrototypePop,
27+ ArrayPrototypePush,
2728 ArrayPrototypeSlice,
2829 ArrayPrototypeSplice,
2930 ArrayPrototypeUnshift,
@@ -33,6 +34,7 @@ const {
3334 FunctionPrototypeBind,
3435 FunctionPrototypeCall,
3536 NumberIsNaN,
37+ NumberMAX_SAFE_INTEGER,
3638 ObjectCreate,
3739 ObjectDefineProperty,
3840 ObjectDefineProperties,
@@ -59,6 +61,8 @@ const {
5961} = require ( 'internal/util/inspect' ) ;
6062
6163let spliceOne ;
64+ let FixedQueue ;
65+ let kFirstEventParam ;
6266
6367const {
6468 AbortError,
@@ -73,6 +77,7 @@ const {
7377} = require ( 'internal/errors' ) ;
7478
7579const {
80+ validateInteger,
7681 validateAbortSignal,
7782 validateBoolean,
7883 validateFunction,
@@ -84,6 +89,7 @@ const kErrorMonitor = Symbol('events.errorMonitor');
8489const kMaxEventTargetListeners = Symbol ( 'events.maxEventTargetListeners' ) ;
8590const kMaxEventTargetListenersWarned =
8691 Symbol ( 'events.maxEventTargetListenersWarned' ) ;
92+ const kWatermarkData = SymbolFor ( 'nodejs.watermarkData' ) ;
8793
8894let EventEmitterAsyncResource ;
8995// The EventEmitterAsyncResource has to be initialized lazily because event.js
@@ -999,25 +1005,44 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
9991005 * Returns an `AsyncIterator` that iterates `event` events.
10001006 * @param {EventEmitter } emitter
10011007 * @param {string | symbol } event
1002- * @param {{ signal: AbortSignal; } } [options]
1008+ * @param {{
1009+ * signal: AbortSignal;
1010+ * close?: string[];
1011+ * highWatermark?: number,
1012+ * lowWatermark?: number
1013+ * }} [options]
10031014 * @returns {AsyncIterator }
10041015 */
1005- function on ( emitter , event , options ) {
1006- const signal = options ?. signal ;
1016+ function on ( emitter , event , options = { } ) {
1017+ // Parameters validation
1018+ const signal = options . signal ;
10071019 validateAbortSignal ( signal , 'options.signal' ) ;
10081020 if ( signal ?. aborted )
10091021 throw new AbortError ( undefined , { cause : signal ?. reason } ) ;
1010-
1011- const unconsumedEvents = [ ] ;
1012- const unconsumedPromises = [ ] ;
1022+ const highWatermark = options . highWatermark ?? NumberMAX_SAFE_INTEGER ;
1023+ validateInteger ( highWatermark , 'options.highWatermark' , 1 ) ;
1024+ const lowWatermark = options . lowWatermark ?? 1 ;
1025+ validateInteger ( lowWatermark , 'options.lowWatermark' , 1 ) ;
1026+
1027+ // Preparing controlling queues and variables
1028+ FixedQueue ??= require ( 'internal/fixed_queue' ) ;
1029+ const unconsumedEvents = new FixedQueue ( ) ;
1030+ const unconsumedPromises = new FixedQueue ( ) ;
1031+ let paused = false ;
10131032 let error = null ;
10141033 let finished = false ;
1034+ let size = 0 ;
10151035
10161036 const iterator = ObjectSetPrototypeOf ( {
10171037 next ( ) {
10181038 // First, we consume all unread events
1019- const value = unconsumedEvents . shift ( ) ;
1020- if ( value ) {
1039+ if ( size ) {
1040+ const value = unconsumedEvents . shift ( ) ;
1041+ size -- ;
1042+ if ( paused && size < lowWatermark ) {
1043+ emitter . resume ( ) ;
1044+ paused = false ;
1045+ }
10211046 return PromiseResolve ( createIterResult ( value , false ) ) ;
10221047 }
10231048
@@ -1032,9 +1057,7 @@ function on(emitter, event, options) {
10321057 }
10331058
10341059 // If the iterator is finished, resolve to done
1035- if ( finished ) {
1036- return PromiseResolve ( createIterResult ( undefined , true ) ) ;
1037- }
1060+ if ( finished ) return closeHandler ( ) ;
10381061
10391062 // Wait until an event happens
10401063 return new Promise ( function ( resolve , reject ) {
@@ -1043,46 +1066,62 @@ function on(emitter, event, options) {
10431066 } ,
10441067
10451068 return ( ) {
1046- eventTargetAgnosticRemoveListener ( emitter , event , eventHandler ) ;
1047- eventTargetAgnosticRemoveListener ( emitter , 'error' , errorHandler ) ;
1048-
1049- if ( signal ) {
1050- eventTargetAgnosticRemoveListener (
1051- signal ,
1052- 'abort' ,
1053- abortListener ,
1054- { once : true } ) ;
1055- }
1056-
1057- finished = true ;
1058-
1059- for ( const promise of unconsumedPromises ) {
1060- promise . resolve ( createIterResult ( undefined , true ) ) ;
1061- }
1062-
1063- return PromiseResolve ( createIterResult ( undefined , true ) ) ;
1069+ return closeHandler ( ) ;
10641070 } ,
10651071
10661072 throw ( err ) {
10671073 if ( ! err || ! ( err instanceof Error ) ) {
10681074 throw new ERR_INVALID_ARG_TYPE ( 'EventEmitter.AsyncIterator' ,
10691075 'Error' , err ) ;
10701076 }
1071- error = err ;
1072- eventTargetAgnosticRemoveListener ( emitter , event , eventHandler ) ;
1073- eventTargetAgnosticRemoveListener ( emitter , 'error' , errorHandler ) ;
1077+ errorHandler ( err ) ;
10741078 } ,
1075-
10761079 [ SymbolAsyncIterator ] ( ) {
10771080 return this ;
1078- }
1081+ } ,
1082+ [ kWatermarkData ] : {
1083+ /**
1084+ * The current queue size
1085+ */
1086+ get size ( ) {
1087+ return size ;
1088+ } ,
1089+ /**
1090+ * The low watermark. The emitter is resumed every time size is lower than it
1091+ */
1092+ get low ( ) {
1093+ return lowWatermark ;
1094+ } ,
1095+ /**
1096+ * The high watermark. The emitter is paused every time size is higher than it
1097+ */
1098+ get high ( ) {
1099+ return highWatermark ;
1100+ } ,
1101+ /**
1102+ * It checks wether the emitter is paused by the watermark controller or not
1103+ */
1104+ get isPaused ( ) {
1105+ return paused ;
1106+ }
1107+ } ,
10791108 } , AsyncIteratorPrototype ) ;
10801109
1081- eventTargetAgnosticAddListener ( emitter , event , eventHandler ) ;
1110+ // Adding event handlers
1111+ const { addEventListener, removeAll } = listenersController ( ) ;
1112+ kFirstEventParam ??= require ( 'internal/events/symbols' ) . kFirstEventParam ;
1113+ addEventListener ( emitter , event , options [ kFirstEventParam ] ? eventHandler : function ( ...args ) {
1114+ return eventHandler ( args ) ;
1115+ } ) ;
10821116 if ( event !== 'error' && typeof emitter . on === 'function' ) {
1083- emitter . on ( 'error' , errorHandler ) ;
1117+ addEventListener ( emitter , 'error' , errorHandler ) ;
1118+ }
1119+ const closeEvents = options ?. close ;
1120+ if ( closeEvents ?. length ) {
1121+ for ( let i = 0 ; i < closeEvents . length ; i ++ ) {
1122+ addEventListener ( emitter , closeEvents [ i ] , closeHandler ) ;
1123+ }
10841124 }
1085-
10861125 if ( signal ) {
10871126 eventTargetAgnosticAddListener (
10881127 signal ,
@@ -1097,27 +1136,48 @@ function on(emitter, event, options) {
10971136 errorHandler ( new AbortError ( undefined , { cause : signal ?. reason } ) ) ;
10981137 }
10991138
1100- function eventHandler ( ...args ) {
1101- const promise = ArrayPrototypeShift ( unconsumedPromises ) ;
1102- if ( promise ) {
1103- promise . resolve ( createIterResult ( args , false ) ) ;
1104- } else {
1105- unconsumedEvents . push ( args ) ;
1106- }
1139+ function eventHandler ( value ) {
1140+ if ( unconsumedPromises . isEmpty ( ) ) {
1141+ size ++ ;
1142+ if ( ! paused && size > highWatermark ) {
1143+ paused = true ;
1144+ emitter . pause ( ) ;
1145+ }
1146+ unconsumedEvents . push ( value ) ;
1147+ } else unconsumedPromises . shift ( ) . resolve ( createIterResult ( value , false ) ) ;
11071148 }
11081149
11091150 function errorHandler ( err ) {
1110- finished = true ;
1151+ if ( unconsumedPromises . isEmpty ( ) ) error = err ;
1152+ else unconsumedPromises . shift ( ) . reject ( err ) ;
11111153
1112- const toError = ArrayPrototypeShift ( unconsumedPromises ) ;
1154+ closeHandler ( ) ;
1155+ }
11131156
1114- if ( toError ) {
1115- toError . reject ( err ) ;
1116- } else {
1117- // The next time we call next()
1118- error = err ;
1157+ function closeHandler ( ) {
1158+ removeAll ( ) ;
1159+ finished = true ;
1160+ const doneResult = createIterResult ( undefined , true ) ;
1161+ while ( ! unconsumedPromises . isEmpty ( ) ) {
1162+ unconsumedPromises . shift ( ) . resolve ( doneResult ) ;
11191163 }
11201164
1121- iterator . return ( ) ;
1165+ return PromiseResolve ( doneResult ) ;
11221166 }
11231167}
1168+
1169+ function listenersController ( ) {
1170+ const listeners = [ ] ;
1171+
1172+ return {
1173+ addEventListener ( emitter , event , handler , flags ) {
1174+ eventTargetAgnosticAddListener ( emitter , event , handler , flags ) ;
1175+ ArrayPrototypePush ( listeners , [ emitter , event , handler , flags ] ) ;
1176+ } ,
1177+ removeAll ( ) {
1178+ while ( listeners . length > 0 ) {
1179+ ReflectApply ( eventTargetAgnosticRemoveListener , undefined , ArrayPrototypePop ( listeners ) ) ;
1180+ }
1181+ }
1182+ } ;
1183+ }
0 commit comments