@@ -59,6 +59,227 @@ functions for streams that return `Promise` objects rather than using
5959callbacks. The API is accessible via ` require('node:stream/promises') `
6060or ` require('node:stream').promises ` .
6161
62+ ### ` stream.pipeline(source[, ...transforms], destination[, options]) `
63+
64+ ### ` stream.pipeline(streams[, options]) `
65+
66+ <!-- YAML
67+ added: v15.0.0
68+ -->
69+
70+ * ` streams ` {Stream\[ ] |Iterable\[ ] |AsyncIterable\[ ] |Function\[ ] }
71+ * ` source ` {Stream|Iterable|AsyncIterable|Function}
72+ * Returns: {Promise|AsyncIterable}
73+ * ` ...transforms ` {Stream|Function}
74+ * ` source ` {AsyncIterable}
75+ * Returns: {Promise|AsyncIterable}
76+ * ` destination ` {Stream|Function}
77+ * ` source ` {AsyncIterable}
78+ * Returns: {Promise|AsyncIterable}
79+ * ` options ` {Object}
80+ * ` signal ` {AbortSignal}
81+ * ` end ` {boolean}
82+ * Returns: {Promise} Fulfills when the pipeline is complete.
83+
84+ ``` cjs
85+ const { pipeline } = require (' node:stream/promises' );
86+ const fs = require (' node:fs' );
87+ const zlib = require (' node:zlib' );
88+
89+ async function run () {
90+ await pipeline (
91+ fs .createReadStream (' archive.tar' ),
92+ zlib .createGzip (),
93+ fs .createWriteStream (' archive.tar.gz' ),
94+ );
95+ console .log (' Pipeline succeeded.' );
96+ }
97+
98+ run ().catch (console .error );
99+ ```
100+
101+ ``` mjs
102+ import { pipeline } from ' node:stream/promises' ;
103+ import { createReadStream , createWriteStream } from ' node:fs' ;
104+ import { createGzip } from ' node:zlib' ;
105+
106+ await pipeline (
107+ createReadStream (' archive.tar' ),
108+ createGzip (),
109+ createWriteStream (' archive.tar.gz' ),
110+ );
111+ console .log (' Pipeline succeeded.' );
112+ ```
113+
114+ To use an ` AbortSignal ` , pass it inside an options object, as the last argument.
115+ When the signal is aborted, ` destroy ` will be called on the underlying pipeline,
116+ with an ` AbortError ` .
117+
118+ ``` cjs
119+ const { pipeline } = require (' node:stream/promises' );
120+ const fs = require (' node:fs' );
121+ const zlib = require (' node:zlib' );
122+
123+ async function run () {
124+ const ac = new AbortController ();
125+ const signal = ac .signal ;
126+
127+ setImmediate (() => ac .abort ());
128+ await pipeline (
129+ fs .createReadStream (' archive.tar' ),
130+ zlib .createGzip (),
131+ fs .createWriteStream (' archive.tar.gz' ),
132+ { signal },
133+ );
134+ }
135+
136+ run ().catch (console .error ); // AbortError
137+ ```
138+
139+ ``` mjs
140+ import { pipeline } from ' node:stream/promises' ;
141+ import { createReadStream , createWriteStream } from ' node:fs' ;
142+ import { createGzip } from ' node:zlib' ;
143+
144+ const ac = new AbortController ();
145+ const { signal } = ac;
146+ setImmediate (() => ac .abort ());
147+ try {
148+ await pipeline (
149+ createReadStream (' archive.tar' ),
150+ createGzip (),
151+ createWriteStream (' archive.tar.gz' ),
152+ { signal },
153+ );
154+ } catch (err) {
155+ console .error (err); // AbortError
156+ }
157+ ```
158+
159+ The ` pipeline ` API also supports async generators:
160+
161+ ``` cjs
162+ const { pipeline } = require (' node:stream/promises' );
163+ const fs = require (' node:fs' );
164+
165+ async function run () {
166+ await pipeline (
167+ fs .createReadStream (' lowercase.txt' ),
168+ async function * (source , { signal }) {
169+ source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
170+ for await (const chunk of source ) {
171+ yield await processChunk (chunk, { signal });
172+ }
173+ },
174+ fs .createWriteStream (' uppercase.txt' ),
175+ );
176+ console .log (' Pipeline succeeded.' );
177+ }
178+
179+ run ().catch (console .error );
180+ ```
181+
182+ ``` mjs
183+ import { pipeline } from ' node:stream/promises' ;
184+ import { createReadStream , createWriteStream } from ' node:fs' ;
185+
186+ await pipeline (
187+ createReadStream (' lowercase.txt' ),
188+ async function * (source , { signal }) {
189+ source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
190+ for await (const chunk of source ) {
191+ yield await processChunk (chunk, { signal });
192+ }
193+ },
194+ createWriteStream (' uppercase.txt' ),
195+ );
196+ console .log (' Pipeline succeeded.' );
197+ ```
198+
199+ Remember to handle the ` signal ` argument passed into the async generator.
200+ Especially in the case where the async generator is the source for the
201+ pipeline (i.e. first argument) or the pipeline will never complete.
202+
203+ ``` cjs
204+ const { pipeline } = require (' node:stream/promises' );
205+ const fs = require (' node:fs' );
206+
207+ async function run () {
208+ await pipeline (
209+ async function * ({ signal }) {
210+ await someLongRunningfn ({ signal });
211+ yield ' asd' ;
212+ },
213+ fs .createWriteStream (' uppercase.txt' ),
214+ );
215+ console .log (' Pipeline succeeded.' );
216+ }
217+
218+ run ().catch (console .error );
219+ ```
220+
221+ ``` mjs
222+ import { pipeline } from ' node:stream/promises' ;
223+ import fs from ' node:fs' ;
224+ await pipeline (
225+ async function * ({ signal }) {
226+ await someLongRunningfn ({ signal });
227+ yield ' asd' ;
228+ },
229+ fs .createWriteStream (' uppercase.txt' ),
230+ );
231+ console .log (' Pipeline succeeded.' );
232+ ```
233+
234+ The ` pipeline ` API provides [ callback version] [ stream-pipeline ] :
235+
236+ ### ` stream.finished(stream[, options]) `
237+
238+ <!-- YAML
239+ added: v15.0.0
240+ -->
241+
242+ * ` stream ` {Stream}
243+ * ` options ` {Object}
244+ * ` error ` {boolean|undefined}
245+ * ` readable ` {boolean|undefined}
246+ * ` writable ` {boolean|undefined}
247+ * ` signal ` : {AbortSignal|undefined}
248+ * Returns: {Promise} Fulfills when the stream is no
249+ longer readable or writable.
250+
251+ ``` cjs
252+ const { finished } = require (' node:stream/promises' );
253+ const fs = require (' node:fs' );
254+
255+ const rs = fs .createReadStream (' archive.tar' );
256+
257+ async function run () {
258+ await finished (rs);
259+ console .log (' Stream is done reading.' );
260+ }
261+
262+ run ().catch (console .error );
263+ rs .resume (); // Drain the stream.
264+ ```
265+
266+ ``` mjs
267+ import { finished } from ' node:stream/promises' ;
268+ import { createReadStream } from ' node:fs' ;
269+
270+ const rs = createReadStream (' archive.tar' );
271+
272+ async function run () {
273+ await finished (rs);
274+ console .log (' Stream is done reading.' );
275+ }
276+
277+ run ().catch (console .error );
278+ rs .resume (); // Drain the stream.
279+ ```
280+
281+ The ` finished ` API provides [ callback version] [ stream-finished ] :
282+
62283### Object mode
63284
64285All streams created by Node.js APIs operate exclusively on strings and ` Buffer `
@@ -2447,22 +2668,7 @@ Especially useful in error handling scenarios where a stream is destroyed
24472668prematurely (like an aborted HTTP request), and will not emit ` 'end' `
24482669or ` 'finish' ` .
24492670
2450- The ` finished ` API provides promise version:
2451-
2452- ``` js
2453- const { finished } = require (' node:stream/promises' );
2454- const fs = require (' node:fs' );
2455-
2456- const rs = fs .createReadStream (' archive.tar' );
2457-
2458- async function run () {
2459- await finished (rs);
2460- console .log (' Stream is done reading.' );
2461- }
2462-
2463- run ().catch (console .error );
2464- rs .resume (); // Drain the stream.
2465- ```
2671+ The ` finished ` API provides [ promise version] [ stream-finished-promise ] .
24662672
24672673` stream.finished() ` leaves dangling event listeners (in particular
24682674` 'error' ` , ` 'end' ` , ` 'finish' ` and ` 'close' ` ) after ` callback ` has been
@@ -2542,97 +2748,7 @@ pipeline(
25422748);
25432749```
25442750
2545- The ` pipeline ` API provides a promise version, which can also
2546- receive an options argument as the last parameter with a
2547- ` signal ` {AbortSignal} property. When the signal is aborted,
2548- ` destroy ` will be called on the underlying pipeline, with an
2549- ` AbortError ` .
2550-
2551- ``` js
2552- const { pipeline } = require (' node:stream/promises' );
2553- const fs = require (' node:fs' );
2554- const zlib = require (' node:zlib' );
2555-
2556- async function run () {
2557- await pipeline (
2558- fs .createReadStream (' archive.tar' ),
2559- zlib .createGzip (),
2560- fs .createWriteStream (' archive.tar.gz' ),
2561- );
2562- console .log (' Pipeline succeeded.' );
2563- }
2564-
2565- run ().catch (console .error );
2566- ```
2567-
2568- To use an ` AbortSignal ` , pass it inside an options object,
2569- as the last argument:
2570-
2571- ``` js
2572- const { pipeline } = require (' node:stream/promises' );
2573- const fs = require (' node:fs' );
2574- const zlib = require (' node:zlib' );
2575-
2576- async function run () {
2577- const ac = new AbortController ();
2578- const signal = ac .signal ;
2579-
2580- setTimeout (() => ac .abort (), 1 );
2581- await pipeline (
2582- fs .createReadStream (' archive.tar' ),
2583- zlib .createGzip (),
2584- fs .createWriteStream (' archive.tar.gz' ),
2585- { signal },
2586- );
2587- }
2588-
2589- run ().catch (console .error ); // AbortError
2590- ```
2591-
2592- The ` pipeline ` API also supports async generators:
2593-
2594- ``` js
2595- const { pipeline } = require (' node:stream/promises' );
2596- const fs = require (' node:fs' );
2597-
2598- async function run () {
2599- await pipeline (
2600- fs .createReadStream (' lowercase.txt' ),
2601- async function * (source , { signal }) {
2602- source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
2603- for await (const chunk of source ) {
2604- yield await processChunk (chunk, { signal });
2605- }
2606- },
2607- fs .createWriteStream (' uppercase.txt' ),
2608- );
2609- console .log (' Pipeline succeeded.' );
2610- }
2611-
2612- run ().catch (console .error );
2613- ```
2614-
2615- Remember to handle the ` signal ` argument passed into the async generator.
2616- Especially in the case where the async generator is the source for the
2617- pipeline (i.e. first argument) or the pipeline will never complete.
2618-
2619- ``` js
2620- const { pipeline } = require (' node:stream/promises' );
2621- const fs = require (' node:fs' );
2622-
2623- async function run () {
2624- await pipeline (
2625- async function * ({ signal }) {
2626- await someLongRunningfn ({ signal });
2627- yield ' asd' ;
2628- },
2629- fs .createWriteStream (' uppercase.txt' ),
2630- );
2631- console .log (' Pipeline succeeded.' );
2632- }
2633-
2634- run ().catch (console .error );
2635- ```
2751+ The ` pipeline ` API provides a [ promise version] [ stream-pipeline-promise ] .
26362752
26372753` stream.pipeline() ` will call ` stream.destroy(err) ` on all streams except:
26382754
@@ -4566,7 +4682,11 @@ contain multi-byte characters.
45664682[ stream-_write ] : #writable_writechunk-encoding-callback
45674683[ stream-_writev ] : #writable_writevchunks-callback
45684684[ stream-end ] : #writableendchunk-encoding-callback
4685+ [ stream-finished ] : #streamfinishedstream-options-callback
4686+ [ stream-finished-promise ] : #streamfinishedstream-options
45694687[ stream-pause ] : #readablepause
4688+ [ stream-pipeline ] : #streampipelinesource-transforms-destination-callback
4689+ [ stream-pipeline-promise ] : #streampipelinesource-transforms-destination-options
45704690[ stream-push ] : #readablepushchunk-encoding
45714691[ stream-read ] : #readablereadsize
45724692[ stream-resume ] : #readableresume
0 commit comments