Stream Pipelines in Node.js

Published: Jul 13, 2024

Last updated: Jul 13, 2024

Overview

During the Node.js Streams blog series, we covered the core fundamentals you should know about Node.js streams and spent a blog post diving into both Readable streams and Writable streams, we've covered the fundamentals of what you should know about streams in Node.js, as well as the four fundamental stream types.

In this post, we are diving into to the pipeline function from the Node.js stream module: how it can be of use and how we can use it for asynchronous operations.

How to think about pipeline streams

The stream.pipeline function is a utility function that allows us to easily pipe streams together. It is very aptly name, so you can think of it as our water network that bridges between the different streams in our analogies that we've used so far.

Water buckets connected by pipes can be used to think of pipeline streams in Node.js

Water buckets connected by pipes can be used to think of pipeline streams in Node.js

Creating our first pipeline

We can export the pipeline function from the node:stream module and use it to connect streams together.

const { pipeline, Readable, Writable } = require("node:stream"); const data = ["Hello", "World!"]; const readStream = Readable.from(data); // Create a writable function const writeStream = new Writable({ write(chunk, _encoding, callback) { console.log("Writing:", chunk.toString()); callback(); }, }); pipeline(readStream, writeStream, (err) => { if (err) { console.error("Pipeline failed.", err); } else { console.log("Pipeline succeeded."); } });

In the above example, we create a Readable stream from an array of strings and a Writable stream that logs the data to the console. We then use the pipeline function to connect the two streams together.

Running it gives the following:

$ node pipelines/simple.js Writing: Hello Writing: World! Pipeline succeeded.

This simple example demonstrates how we can use the pipeline function to connect streams together.

For a more complex pipeline (well, more complex than above), we can connect multiple streams together:

const { pipeline, Readable, Writable, Transform } = require("node:stream"); const data = ["Hello", "World!"]; const readStream = Readable.from(data); // Create a writable function const writeStream = new Writable({ write(chunk, _encoding, callback) { console.log("Writing:", chunk.toString()); callback(); }, }); const lowerCaseTransform = new Transform({ transform(chunk, _encoding, callback) { let str = chunk.toString(); this.push(`${str.slice(0, 1).toUpperCase()}${str.slice(1)}`); callback(); }, }); const addSuffixTransform = new Transform({ transform(chunk, _encoding, callback) { this.push(`${chunk.toString()}-0123`); callback(); }, }); pipeline( readStream, lowerCaseTransform, addSuffixTransform, writeStream, (err) => { if (err) { console.error("Pipeline failed.", err); } else { console.log("Pipeline succeeded."); } } );

In this case, we add in a couple of transforms just to make some string modifications.

The result:

$ node pipelines/slightly-more-complex.js Writing: hello-0123 Writing: world!-0123 Pipeline succeeded.

The follow sequence diagram can you help you visualize how the pipeline is operating:

Sequence diagram for our slightly-more-complex pipeline

Sequence diagram for our slightly-more-complex pipeline

Why use the pipeline function?

The pipeline function from the stream module in Node.js offers several important benefits:

  1. Automatic error handling:

    • pipeline automatically handles errors from any of the streams in the pipeline.
    • It ensures that if an error occurs in any stream, all streams are properly closed.
    • This prevents potential memory leaks and resource issues.
  2. Simplified code:

    • It reduces the boilerplate code needed to properly manage multiple streams.
    • You don't need to manually handle 'error' and 'end' events for each stream.
  3. Backpressure management:

    • pipeline automatically manages backpressure between the streams.
    • It ensures that faster streams don't overwhelm slower ones by pausing and resuming as needed.
  4. Cleanup on completion:

    • It automatically cleans up resources when the pipeline finishes or errors out.
    • This includes closing file descriptors and freeing up memory.
  5. Promise support:

    • When used with util.promisify or when directly imported from node:stream/promises, it can return a Promise, allowing for easier use with async/await syntax.
  6. Multiple streams:

    • It can handle any number of streams, making it easy to create complex data processing pipelines.
  7. Improved performance:

    • The internal implementation is optimized for performance, especially when dealing with object mode streams.
  8. Consistent behavior:

    • It ensures consistent behavior across different types of streams (readable, writable, transform).
  9. Error propagation:

    • Errors are properly propagated through the pipeline, ensuring that error handling is centralized and reliable.
  10. Ordering guarantee:

    • It guarantees that data will be processed in the order it's received, which is particularly important for certain types of data processing.

Here's another simple example to illustrate its use with other Node.js modules:

Using pipeline is generally considered a best practice when working with streams in Node.js due to these benefits.

Asynchronous streams

What happens if our streams happen to be asynchronous? We can import pipeline from the node:stream/promises module to handle this.

const { Readable, Transform, Writable } = require("node:stream"); const { pipeline } = require("node:stream/promises"); // Simulated async operation const asyncOperation = (num) => { return new Promise((resolve) => { setTimeout(() => { resolve(num * 2); }, Math.random() * 1000); // Random delay to simulate varying processing times }); }; // Source stream class NumberSource extends Readable { constructor(options) { super(options); this.current = 0; this.max = 10; } _read() { if (this.current < this.max) { this.push(this.current.toString()); this.current++; } else { this.push(null); } } } // Async transform stream class AsyncDoubler extends Transform { constructor(options) { super({ ...options, objectMode: true }); } _transform(chunk, encoding, callback) { const num = parseInt(chunk, 10); asyncOperation(num) .then((result) => { this.push(`${num} doubled is ${result}\n`); callback(); }) .catch(callback); } } // Destination stream class ConsoleWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString().trim()); callback(); } } async function runPipeline() { try { await pipeline(new NumberSource(), new AsyncDoubler(), new ConsoleWriter()); console.log("Pipeline succeeded"); } catch (err) { console.error("Pipeline failed", err); } } runPipeline();

In this example, we have a NumberSource readable stream that emits numbers from 0 to 9, an AsyncDoubler transform stream that doubles the numbers asynchronously, and a ConsoleWriter writable stream that logs the doubled numbers to the console.

Running the code gives the following:

$ node pipelines/async.js 0 doubled is 0 1 doubled is 2 2 doubled is 4 3 doubled is 6 4 doubled is 8 5 doubled is 10 6 doubled is 12 7 doubled is 14 8 doubled is 16 9 doubled is 18 Pipeline succeeded

We can visualize the asynchronous pipeline as follows:

Sequence diagram for our asynchronous pipeline

Sequence diagram for our asynchronous pipeline

If we had tried running the pipeline without supporting async, the code would have executed without waiting:

// WHAT NOT TO DO const { pipeline, Readable, Transform, Writable } = require("node:stream"); // const { pipeline } = require("stream/promises"); // Simulated async operation const asyncOperation = (num) => { return new Promise((resolve) => { setTimeout(() => { resolve(num * 2); }, Math.random() * 1000); // Random delay to simulate varying processing times }); }; // Source stream class NumberSource extends Readable { constructor(options) { super(options); this.current = 0; this.max = 10; } _read() { if (this.current < this.max) { this.push(this.current.toString()); this.current++; } else { this.push(null); } } } // Async transform stream class AsyncDoubler extends Transform { constructor(options) { super({ ...options, objectMode: true }); } _transform(chunk, encoding, callback) { const num = parseInt(chunk, 10); asyncOperation(num) .then((result) => { this.push(`${num} doubled is ${result}\n`); callback(); }) .catch(callback); } } // Destination stream class ConsoleWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString().trim()); callback(); } } async function runPipeline() { try { pipeline( new NumberSource(), new AsyncDoubler(), new ConsoleWriter(), (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } ); console.log("HERE ALREADY"); process.exit(); } catch (err) { console.error("Pipeline failed", err); } } runPipeline();

Running the above would exit the code before the streams ran.

$ node pipelines/async-incorrect.js HERE ALREADY

Multiplexer example

We won't go too deep into other pipeline examples in this post as I will be doing so in some follow-up posts, but we will finish off looking at an example using a Duplex stream to create a multiplexer.

In the following example, we create a multiplexer stream that takes two input streams and multiplexes them into a single output stream.

const { Duplex, pipeline, Transform } = require("node:stream"); const { createReadStream, createWriteStream } = require("node:fs"); class Multiplexer extends Duplex { constructor(options) { super(options); this.sources = []; this.currentSource = 0; this.activeSourcesCount = 0; } _read(size) { this.readFromSources(size); } readFromSources(size) { if (this.activeSourcesCount === 0) { this.push(null); return; } let chunk; do { const source = this.sources[this.currentSource]; chunk = source.read(size); if (chunk !== null) { this.push(chunk); return; } this.currentSource = (this.currentSource + 1) % this.sources.length; } while (this.currentSource !== 0); // If we've gone through all sources without reading, wait for more data this.sources.forEach((source) => { source.once("readable", () => this.readFromSources(size)); }); } _write(chunk, encoding, callback) { // For simplicity, we're not implementing write functionality callback(); } addSource(source) { this.sources.push(source); this.activeSourcesCount++; source.on("end", () => { this.activeSourcesCount--; if (this.activeSourcesCount === 0) { this.push(null); } }); } } const upperCaseTransform = new Transform({ transform(chunk, encoding, callback) { callback(null, chunk.toString().toUpperCase()); }, }); // Create input streams const input1 = createReadStream("file1.txt"); const input2 = createReadStream("file2.txt"); // Create output stream const output = createWriteStream("output.txt"); // Create multiplexer const multiplexer = new Multiplexer(); multiplexer.addSource(input1); multiplexer.addSource(input2); // Use pipeline to connect streams pipeline(multiplexer, upperCaseTransform, output, (err) => { if (err) { console.error("Pipeline failed:", err); } else { console.log("Pipeline succeeded"); } });

In the example above, we have two sources file1.txt and file2.txt that will be multiplexed into a single output stream. The output stream will be transformed to uppercase before being written to output.txt.

If our files are Hello, world! and Streams are fun for file1.txt and file2.txt respectively, then an example output we can get at output.txt is:

HELLO, WORLD!STREAMS ARE FUN

The sequence diagram for the multiplexer pipeline can be visualized as follows:

Multiplexer pipeline

Multiplexer pipeline

In the current implementation:

  • The multiplexer attempts to read from each source in turn.
  • Whichever source has data available first will be read from and its data pushed to the output.
  • This can lead to interleaving of data from different sources in an unpredictable way, especially if the sources become readable at different times or at different rates.

That means that the output may not be in the order that the data is written to output.txt is not guaranteed.

Usage with other streams from Node.js modules

Finally, let's recap some streams from native Node.js modules that we've seen before in previous posts and how we can use them with pipeline.

const { pipeline } = require("node:stream"); const { createGzip } = require("node:zlib"); const { createReadStream, createWriteStream } = require("node:fs"); pipeline( createReadStream("input.txt"), createGzip(), createWriteStream("output.txt.gz"), (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } );

In this example, pipeline manages the entire process of reading a file, compressing it, and writing the compressed data to a new file. It handles all the necessary error checking and resource management, significantly simplifying the code compared to manually piping these streams together and handling all the events yourself.

Conclusion

Throughout this blog series, we've delved deeply into the intricate world of Node.js streams, exploring their core concepts, different stream types, and practical use cases. Starting from the fundamentals, we learned about Readable and Writable streams and progressed to more advanced topics like Transform and Duplex streams.

In this post, we focused on the pipeline function, which simplifies the process of connecting multiple streams. We demonstrated how to create basic and complex pipelines, including handling asynchronous streams with ease using stream/pipeline. Additionally, we introduced a multiplexer example to show how multiple input streams can be combined into a single output stream, demonstrating the flexibility and power of Node.js streams.

Streams are a powerful feature in Node.js, providing a way to process data efficiently and effectively. Whether you're working with file I/O, network communication, or any other data processing tasks, mastering streams can significantly enhance your Node.js applications.

In future posts, I will be working through some more specific use cases and examples of how to use streams in Node.js with destinations like S3 buckets.

Resources and further reading

Disclaimer: This blog post used AI to generate the images used for the analogy.

Photo credit: jagodakondratiuk

Personal image

Dennis O'Keeffe

Byron Bay, Australia

Dennis O'Keeffe

2020-present Dennis O'Keeffe.

All Rights Reserved.