When working with streams one of the coolest things ever is using async generators to do piped processing. Let's start with a simple example that generates numbers from 1 to 5:
const numbers = async function* () { for (let i = 0; i < 5; i++) yield i.toString() }
This generator yields 5 numbers, 0 through 4, as strings. Let's pipe them through to see them on the screen:
Readable.from(numbers()).pipe(process.stdout)
Boom! Just like that, we're printing the content of the stream to standard output. Let's see how we can do the printing using console.log()
:
const logger = async function* ( source: AsyncGenerator<string> ) { for await (const number of source) { console.log(number) } } Readable.from(numbers()) .pipe(Duplex.from(logger)) .resume()
The call to Readable.resume()
at the end makes the stream pull all the values and process them.
What's interesting here is that we're using a Duplex
stream created from an async generator. Since it is a Duplex
stream (so Readable
and Writeable
) we can still process the data further. For example, we can augment the numbers with some description:
const augmenter = async function* (source: AsyncGenerator<string>) { for await (const number of source) { yield `This is number ${number}` } } Readable.from(numbers()) .pipe(Duplex.from(augmenter)) .pipe(Duplex.from(logger)) .resume()
Please note that the source
argument to all the generators have a generic type, that will determine the type of element we are iterating over. In our case, the generic type is string
so the number
variable is of type string
.
I think that's totally cool - how about you?
Happy coding!
Top comments (0)