@@ -45,6 +45,8 @@ const {
4545 ERR_UNKNOWN_ENCODING
4646} = require ( 'internal/errors' ) . codes ;
4747
48+ const { errorOrDestroy } = destroyImpl ;
49+
4850util . inherits ( Writable , Stream ) ;
4951
5052function nop ( ) { }
@@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
147149 // Should close be emitted on destroy. Defaults to true.
148150 this . emitClose = options . emitClose !== false ;
149151
152+ // Should .destroy() be called after 'finish' (and potentially 'end')
153+ this . autoDestroy = ! ! options . autoDestroy ;
154+
150155 // count buffered requests
151156 this . bufferedRequestCount = 0 ;
152157
@@ -235,14 +240,14 @@ function Writable(options) {
235240
236241// Otherwise people can pipe Writable streams, which is just wrong.
237242Writable . prototype . pipe = function ( ) {
238- this . emit ( 'error' , new ERR_STREAM_CANNOT_PIPE ( ) ) ;
243+ errorOrDestroy ( this , new ERR_STREAM_CANNOT_PIPE ( ) ) ;
239244} ;
240245
241246
242247function writeAfterEnd ( stream , cb ) {
243248 var er = new ERR_STREAM_WRITE_AFTER_END ( ) ;
244249 // TODO: defer error events consistently everywhere, not just the cb
245- stream . emit ( 'error' , er ) ;
250+ errorOrDestroy ( stream , er ) ;
246251 process . nextTick ( cb , er ) ;
247252}
248253
@@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
258263 er = new ERR_INVALID_ARG_TYPE ( 'chunk' , [ 'string' , 'Buffer' ] , chunk ) ;
259264 }
260265 if ( er ) {
261- stream . emit ( 'error' , er ) ;
266+ errorOrDestroy ( stream , er ) ;
262267 process . nextTick ( cb , er ) ;
263268 return false ;
264269 }
@@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
422427 // after error
423428 process . nextTick ( finishMaybe , stream , state ) ;
424429 stream . _writableState . errorEmitted = true ;
425- stream . emit ( 'error' , er ) ;
430+ errorOrDestroy ( stream , er ) ;
426431 } else {
427432 // the caller expect this to happen before if
428433 // it is async
429434 cb ( er ) ;
430435 stream . _writableState . errorEmitted = true ;
431- stream . emit ( 'error' , er ) ;
436+ errorOrDestroy ( stream , er ) ;
432437 // this can emit finish, but finish must
433438 // always follow error
434439 finishMaybe ( stream , state ) ;
@@ -612,7 +617,7 @@ function callFinal(stream, state) {
612617 stream . _final ( ( err ) => {
613618 state . pendingcb -- ;
614619 if ( err ) {
615- stream . emit ( 'error' , err ) ;
620+ errorOrDestroy ( stream , err ) ;
616621 }
617622 state . prefinished = true ;
618623 stream . emit ( 'prefinish' ) ;
@@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
639644 if ( state . pendingcb === 0 ) {
640645 state . finished = true ;
641646 stream . emit ( 'finish' ) ;
647+
648+ if ( state . autoDestroy ) {
649+ // In case of duplex streams we need a way to detect
650+ // if the readable side is ready for autoDestroy as well
651+ const rState = stream . _readableState ;
652+ if ( ! rState || ( rState . autoDestroy && rState . endEmitted ) ) {
653+ stream . destroy ( ) ;
654+ }
655+ }
642656 }
643657 }
644658 return need ;
0 commit comments