DEV Community

Cover image for You've been using Node.js Transform streams wrong
Kieran Simpson
Kieran Simpson

Posted on

You've been using Node.js Transform streams wrong

Duty calls

Just like in the XKCD comic, the reason this article has been written is because there is a lot of incorrect information on Stack Overflow, GitHub issues, and other sites about how to properly handle back-pressure in Node.js Transform streams.

I got caught out by a problem where I was losing a lot of data when more data was output from a Transform stream relative to the amount of data coming into the stream. The reason was that the Readable buffer in the Transform stream was overflowing because data was being transformed faster than the destination stream could accept it. Even being a seasoned Node.js developer with years of commercial experience writing large scale applications, I had to do a lot of searching, reading and asking for help to figure out how to solve this problem. I saw a lot of bad advice and incorrect solutions that would confuse people who have the same problem if they don't have in-depth knowledge of how Node.js streams work.

With some help I eventually solved the problem and hopefully this article can help someone else.

To set the scene, I've been working on a large Typescript project. Part of the project is supporting performance testing of the application services. As the project evolves we have to generate new, very large datasets. For example, from 200MB of input I generated 40GB of output data. That's 200 times the amount of output from input! The scripts are IO bound, so, of course I turned to streams. Read the input in a Readable stream, generate the data in a Transform steam, and write it to a Writable stream. This is the sort of IO work Node.js is known for.

Conceptually the idea looks like

concept of data generation

Back-pressure - what is it good for?

What makes Node.js streams so good at handling large amounts of IO without exhausting memory are streams back-pressure mechanism. This allows streams to signal when they are full, and when they can take/have more data. Typically this is used when writing to a stream as writing to streams is often slower than reading. When piping a Readable stream to a Writable, the pipe method sets up handlers for events on the Writable and controls the flow of data from the Readable into the Writable. Data is pulled[1] from the Readable's internal buffer after the readable event is emitted and written using write to the Writable. If write returns false ie: the Writable's buffer is full, the handler pauses[2] the Readable until the Writable's drain event is emitted meaning the Writable's buffer can have more data written to it. Then the read loop starts again. Back-pressure ensures that the overall flow of data moves as fast as the slowest stream in the flow.

When data is being pulled from the Readable buffer, the Readable's internal _read method is called when the Readable needs to add more data to the buffer via calls to push. This is important to remember.

Conceptually the flow of data looks like

conceptual sequence of piping between streams

Transform streams - a tale of two halves

A Transform is a Duplex stream, in that it is Readable and Writable. It therefore has two buffers, a write buffer and a read buffer. The write buffer holds data from calls to write and the read buffer holds data that is to be consumed by calls to read. Data from the write buffer is given to the _transform method of the Transform stream. Implementations of _transform have to add output to the read buffer via calls to push. _transform signals when a chunk of data from the Writable buffer is processed by using the callback argument.

For my use case, I was inflating the input data. The result of the transformation was yielding data that was significantly larger in size (number of bytes) than the data coming into the stream (in bytes). So I ended up filling the Transform read buffer faster than it could be consumed by the Writable destination the Transform stream was being piped to. Although the pipe handling logic was waiting for the drain event on the destination Writable to continue reading from the Transform's read buffer, my Transform was pushing so much data, even from a single input chunk, the buffer overflowed and data started being discarded.

I needed to handle the case where the read buffer was full and to pause the Transform stream until the read buffer could take more data.

This is when my search began. From reading many, many "answers" on this problem I have distilled the bad advice and hacks into two main categories.

Bad advice #1 - ignore the return value from push

I was quite surprised the amount of answers I found which said to ignore the return value from push. However, this is a very bad idea as when push is returning false it is signalling that the read buffer is full. So the producer ie: _transform needs to wait before pushing more data. Because _transform needs to acknowledge processing of the chunk from the write buffer, this also means not calling callback until the read buffer can take more data. This effectively pauses the Transform stream and if the write buffer fills up, the stream will return false from write to pause writing into the Transform. This is what back-pressure is meant to do.

Unfortunately the Node.js documentation and most of the examples I've seen on the internet ignore the return value from push. There are assumptions being made about the size of the output data relative to the size of the input chunk, and the speed at which the read buffer is emptied relative to how fast data is transformed.

To handle back-pressure correctly, _transform needs to look something like

_transform(chunk, encoding, callback) { const more = this.push(doSomethingWithChunk(chunk, encoding)) if (!more) { // ?? what to listen for here?  } else { callback() } } 
Enter fullscreen mode Exit fullscreen mode

If you see a code example for Transform that doesn't check the return value from push you know it's not back-pressure safe and could break your application.

The question I then asked was what event to listen for to start pushing data again when the read buffer could take more data.

Bad Advice #2 - wait for the drain event

A lot of "answers" to this question get confused about the role of the buffers in a Transform stream, and suggest to wait for the drain event.

if (!more) { this.once("drain", callback) } else { callback() } 
Enter fullscreen mode Exit fullscreen mode

This is clearly wrong as the drain event is for the Writable side of the Transform and is a signal to write more data into the write buffer. Whereas I need a signal that the read buffer can take more data.

If you wait for the drain event your application will deadlock as the write buffer will fill up waiting for _transform to acknowledge the current chunk is processed, where _transform is waiting for the drain event to be emitted to acknowledge the chunk is processed - which will never happen.

Getting ready to read

If drain is an event emitted by the Writable side of the Transform stream, what's the right event for the Readable side? What event signals that producers can start pushing data again?

Turns out there isn't one.

However, we can do it ourselves.

When the Readable buffer needs more data to give to consumers, the _read method is called. Being a Readable, the Transform class implements _read. So we can override it to signal to our _transform method when to continue processing chunks of data.

class ReadyTransform extends Transform { _read(size) { // have to emit before calling superclass this.emit("ready") super._read(size) } _transform(chunk, encoding, callback) { const more = this.push(doSomethingWithChunk(chunk, encoding)) if (!more) { this.once("ready", callback) } else { callback() } } } 
Enter fullscreen mode Exit fullscreen mode

This unpauses the Transform as the stream will start processing chunks again and adding to the read buffer.

The solution is quite simple, but it requires real knowledge of how Node.js streams work to write it.

After having a workable pattern, I found some extra uses cases around how to use the pattern. If there are multiple calls to push for a input chunk, every return value from push needs to be checked with this pattern. Another was with _flush so that if the Transform needs to output a large amount of data when flushing, the pattern could be reused. There was also the use case when async work needed to be done to transform the input data to output data. The solution was to separate the transformation from the pushing logic using Generators (and AsyncGenerators) with continuations

Inflating Transform

As a result of my work, I have published InflatingTransform, an MIT licensed Transform class that provides the ability to push large amounts of data from a Transform stream while still respecting back-pressure.

A full example is available in the project repo

The class provides a default implementation of _transform which will use a generator method *_inflate to generate chunks of data to be pushed from a chunk that is written to the stream. Subclasses must override *_inflate, or provide it via the constructor option inflate.

To accommodate generators that need to perform asynchronous work to transform a chunk, generator methods in this class can yield Promises, or an async generator can be used. The class will wait for the Promise to resolve before pushing the value. If the Promise rejects, the error will be passed to the transform callback function.

Subclasses can override the _transform implementation if necessary. However, if push returns false, subclasses should wait for the ready event before pushing more data. They should defer calling the callback passed to the _transform method until after they have pushed everything they can so far.

To accommodate streams that need to push final chunks of data when flushed, the class provides a default implementation of _flush. The method will use a generator method *_burst to generate additional chunks of data to be pushed to the Readable stream. The default implementation of *_burst simply yields null. Subclasses may override *_burst, or provide it via the constructor option burst.

Roll the credits

Hopefully the next time someone has the type of problem where a Transform stream is pushing a lot of output data they find this article and can ignore all the wrong answers on the rest of the internet.

If you found this helpful, please leave a comment below.

Finally, credit for InflatingTransform also belongs to Ben Schmidt who helped me out by digging into the guts of the streams code and for reviewing the development of the library.


Footnotes

[1] - How data is taken from a Readable stream's buffer can vary based on which mode is used. For simplicity, we're just going use the read() method on the stream when we discussing getting data from the stream.

[2] - How Readables are paused is a complicated topic and the curious reader should read the docs. We only care that the Readable is paused to stop data flowing to the Writable when the Writable is full and how that affects the Readable's internal buffer if more data is pushed into it while it is paused.

Top comments (0)