Stream Pipelines in Node.js
Published: Jul 13, 2024
Last updated: Jul 13, 2024
Overview
During the Node.js Streams blog series, we covered the core fundamentals you should know about Node.js streams and spent a blog post diving into both Readable streams and Writable streams, we've covered the fundamentals of what you should know about streams in Node.js, as well as the four fundamental stream types.
In this post, we are diving into to the pipeline function from the Node.js stream module: how it can be of use and how we can use it for asynchronous operations.
How to think about pipeline streams
The stream.pipeline
function is a utility function that allows us to easily pipe streams together. It is very aptly name, so you can think of it as our water network that bridges between the different streams in our analogies that we've used so far.
Water buckets connected by pipes can be used to think of pipeline streams in Node.js
Creating our first pipeline
We can export the pipeline
function from the node:stream
module and use it to connect streams together.
const { pipeline, Readable, Writable } = require("node:stream"); const data = ["Hello", "World!"]; const readStream = Readable.from(data); // Create a writable function const writeStream = new Writable({ write(chunk, _encoding, callback) { console.log("Writing:", chunk.toString()); callback(); }, }); pipeline(readStream, writeStream, (err) => { if (err) { console.error("Pipeline failed.", err); } else { console.log("Pipeline succeeded."); } });
In the above example, we create a Readable
stream from an array of strings and a Writable
stream that logs the data to the console. We then use the pipeline
function to connect the two streams together.
Running it gives the following:
$ node pipelines/simple.js Writing: Hello Writing: World! Pipeline succeeded.
This simple example demonstrates how we can use the pipeline
function to connect streams together.
For a more complex pipeline (well, more complex than above), we can connect multiple streams together:
const { pipeline, Readable, Writable, Transform } = require("node:stream"); const data = ["Hello", "World!"]; const readStream = Readable.from(data); // Create a writable function const writeStream = new Writable({ write(chunk, _encoding, callback) { console.log("Writing:", chunk.toString()); callback(); }, }); const lowerCaseTransform = new Transform({ transform(chunk, _encoding, callback) { let str = chunk.toString(); this.push(`${str.slice(0, 1).toUpperCase()}${str.slice(1)}`); callback(); }, }); const addSuffixTransform = new Transform({ transform(chunk, _encoding, callback) { this.push(`${chunk.toString()}-0123`); callback(); }, }); pipeline( readStream, lowerCaseTransform, addSuffixTransform, writeStream, (err) => { if (err) { console.error("Pipeline failed.", err); } else { console.log("Pipeline succeeded."); } } );
In this case, we add in a couple of transforms just to make some string modifications.
The result:
$ node pipelines/slightly-more-complex.js Writing: hello-0123 Writing: world!-0123 Pipeline succeeded.
The follow sequence diagram can you help you visualize how the pipeline is operating:
Sequence diagram for our slightly-more-complex pipeline
Why use the pipeline function?
The pipeline
function from the stream
module in Node.js offers several important benefits:
Automatic error handling:
pipeline
automatically handles errors from any of the streams in the pipeline.- It ensures that if an error occurs in any stream, all streams are properly closed.
- This prevents potential memory leaks and resource issues.
Simplified code:
- It reduces the boilerplate code needed to properly manage multiple streams.
- You don't need to manually handle 'error' and 'end' events for each stream.
Backpressure management:
pipeline
automatically manages backpressure between the streams.- It ensures that faster streams don't overwhelm slower ones by pausing and resuming as needed.
Cleanup on completion:
- It automatically cleans up resources when the pipeline finishes or errors out.
- This includes closing file descriptors and freeing up memory.
Promise support:
- When used with
util.promisify
or when directly imported fromnode:stream/promises
, it can return aPromise
, allowing for easier use with async/await syntax.
- When used with
Multiple streams:
- It can handle any number of streams, making it easy to create complex data processing pipelines.
Improved performance:
- The internal implementation is optimized for performance, especially when dealing with object mode streams.
Consistent behavior:
- It ensures consistent behavior across different types of streams (readable, writable, transform).
Error propagation:
- Errors are properly propagated through the pipeline, ensuring that error handling is centralized and reliable.
Ordering guarantee:
- It guarantees that data will be processed in the order it's received, which is particularly important for certain types of data processing.
Here's another simple example to illustrate its use with other Node.js modules:
Using pipeline
is generally considered a best practice when working with streams in Node.js due to these benefits.
Asynchronous streams
What happens if our streams happen to be asynchronous? We can import pipeline
from the node:stream/promises
module to handle this.
const { Readable, Transform, Writable } = require("node:stream"); const { pipeline } = require("node:stream/promises"); // Simulated async operation const asyncOperation = (num) => { return new Promise((resolve) => { setTimeout(() => { resolve(num * 2); }, Math.random() * 1000); // Random delay to simulate varying processing times }); }; // Source stream class NumberSource extends Readable { constructor(options) { super(options); this.current = 0; this.max = 10; } _read() { if (this.current < this.max) { this.push(this.current.toString()); this.current++; } else { this.push(null); } } } // Async transform stream class AsyncDoubler extends Transform { constructor(options) { super({ ...options, objectMode: true }); } _transform(chunk, encoding, callback) { const num = parseInt(chunk, 10); asyncOperation(num) .then((result) => { this.push(`${num} doubled is ${result}\n`); callback(); }) .catch(callback); } } // Destination stream class ConsoleWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString().trim()); callback(); } } async function runPipeline() { try { await pipeline(new NumberSource(), new AsyncDoubler(), new ConsoleWriter()); console.log("Pipeline succeeded"); } catch (err) { console.error("Pipeline failed", err); } } runPipeline();
In this example, we have a NumberSource
readable stream that emits numbers from 0 to 9, an AsyncDoubler
transform stream that doubles the numbers asynchronously, and a ConsoleWriter
writable stream that logs the doubled numbers to the console.
Running the code gives the following:
$ node pipelines/async.js 0 doubled is 0 1 doubled is 2 2 doubled is 4 3 doubled is 6 4 doubled is 8 5 doubled is 10 6 doubled is 12 7 doubled is 14 8 doubled is 16 9 doubled is 18 Pipeline succeeded
We can visualize the asynchronous pipeline as follows:
Sequence diagram for our asynchronous pipeline
If we had tried running the pipeline
without supporting async, the code would have executed without waiting:
// WHAT NOT TO DO const { pipeline, Readable, Transform, Writable } = require("node:stream"); // const { pipeline } = require("stream/promises"); // Simulated async operation const asyncOperation = (num) => { return new Promise((resolve) => { setTimeout(() => { resolve(num * 2); }, Math.random() * 1000); // Random delay to simulate varying processing times }); }; // Source stream class NumberSource extends Readable { constructor(options) { super(options); this.current = 0; this.max = 10; } _read() { if (this.current < this.max) { this.push(this.current.toString()); this.current++; } else { this.push(null); } } } // Async transform stream class AsyncDoubler extends Transform { constructor(options) { super({ ...options, objectMode: true }); } _transform(chunk, encoding, callback) { const num = parseInt(chunk, 10); asyncOperation(num) .then((result) => { this.push(`${num} doubled is ${result}\n`); callback(); }) .catch(callback); } } // Destination stream class ConsoleWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString().trim()); callback(); } } async function runPipeline() { try { pipeline( new NumberSource(), new AsyncDoubler(), new ConsoleWriter(), (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } ); console.log("HERE ALREADY"); process.exit(); } catch (err) { console.error("Pipeline failed", err); } } runPipeline();
Running the above would exit the code before the streams ran.
$ node pipelines/async-incorrect.js HERE ALREADY
Multiplexer example
We won't go too deep into other pipeline
examples in this post as I will be doing so in some follow-up posts, but we will finish off looking at an example using a Duplex
stream to create a multiplexer.
In the following example, we create a multiplexer stream that takes two input streams and multiplexes them into a single output stream.
const { Duplex, pipeline, Transform } = require("node:stream"); const { createReadStream, createWriteStream } = require("node:fs"); class Multiplexer extends Duplex { constructor(options) { super(options); this.sources = []; this.currentSource = 0; this.activeSourcesCount = 0; } _read(size) { this.readFromSources(size); } readFromSources(size) { if (this.activeSourcesCount === 0) { this.push(null); return; } let chunk; do { const source = this.sources[this.currentSource]; chunk = source.read(size); if (chunk !== null) { this.push(chunk); return; } this.currentSource = (this.currentSource + 1) % this.sources.length; } while (this.currentSource !== 0); // If we've gone through all sources without reading, wait for more data this.sources.forEach((source) => { source.once("readable", () => this.readFromSources(size)); }); } _write(chunk, encoding, callback) { // For simplicity, we're not implementing write functionality callback(); } addSource(source) { this.sources.push(source); this.activeSourcesCount++; source.on("end", () => { this.activeSourcesCount--; if (this.activeSourcesCount === 0) { this.push(null); } }); } } const upperCaseTransform = new Transform({ transform(chunk, encoding, callback) { callback(null, chunk.toString().toUpperCase()); }, }); // Create input streams const input1 = createReadStream("file1.txt"); const input2 = createReadStream("file2.txt"); // Create output stream const output = createWriteStream("output.txt"); // Create multiplexer const multiplexer = new Multiplexer(); multiplexer.addSource(input1); multiplexer.addSource(input2); // Use pipeline to connect streams pipeline(multiplexer, upperCaseTransform, output, (err) => { if (err) { console.error("Pipeline failed:", err); } else { console.log("Pipeline succeeded"); } });
In the example above, we have two sources file1.txt
and file2.txt
that will be multiplexed into a single output stream. The output stream will be transformed to uppercase before being written to output.txt
.
If our files are Hello, world!
and Streams are fun
for file1.txt
and file2.txt
respectively, then an example output we can get at output.txt
is:
HELLO, WORLD!STREAMS ARE FUN
The sequence diagram for the multiplexer pipeline can be visualized as follows:
Multiplexer pipeline
In the current implementation:
- The multiplexer attempts to read from each source in turn.
- Whichever source has data available first will be read from and its data pushed to the output.
- This can lead to interleaving of data from different sources in an unpredictable way, especially if the sources become readable at different times or at different rates.
That means that the output may not be in the order that the data is written to output.txt
is not guaranteed.
Usage with other streams from Node.js modules
Finally, let's recap some streams from native Node.js modules that we've seen before in previous posts and how we can use them with pipeline
.
const { pipeline } = require("node:stream"); const { createGzip } = require("node:zlib"); const { createReadStream, createWriteStream } = require("node:fs"); pipeline( createReadStream("input.txt"), createGzip(), createWriteStream("output.txt.gz"), (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } );
In this example, pipeline
manages the entire process of reading a file, compressing it, and writing the compressed data to a new file. It handles all the necessary error checking and resource management, significantly simplifying the code compared to manually piping these streams together and handling all the events yourself.
Conclusion
Throughout this blog series, we've delved deeply into the intricate world of Node.js streams, exploring their core concepts, different stream types, and practical use cases. Starting from the fundamentals, we learned about Readable and Writable streams and progressed to more advanced topics like Transform and Duplex streams.
In this post, we focused on the pipeline
function, which simplifies the process of connecting multiple streams. We demonstrated how to create basic and complex pipelines, including handling asynchronous streams with ease using stream/pipeline
. Additionally, we introduced a multiplexer example to show how multiple input streams can be combined into a single output stream, demonstrating the flexibility and power of Node.js streams.
Streams are a powerful feature in Node.js, providing a way to process data efficiently and effectively. Whether you're working with file I/O, network communication, or any other data processing tasks, mastering streams can significantly enhance your Node.js applications.
In future posts, I will be working through some more specific use cases and examples of how to use streams in Node.js with destinations like S3 buckets.
Resources and further reading
Disclaimer: This blog post used AI to generate the images used for the analogy.
Photo credit: jagodakondratiuk
Stream Pipelines in Node.js
Introduction