Readable Streams in Node.js
Published: Jul 8, 2024
Last updated: Jul 9, 2024
Overview
In my previous post, we learned everything we need to know to understand Node.js streams. We covered the four fundamental stream types in Node.js: Writable, Readable, Duplex, and Transform. We also discussed the benefits of using streams, key concepts like buffering, backpressure, and piping, and the importance of events in stream processing.
In this post, we will dive deeper into Readable
streams in particular: how they are created, how we can consume data from them, and some a number of examples to get you comfortable.
How to create Readable streams
In our previous post, we started with the mental model that Readable streams are like water taps. In the same way that we can turn on the tap, change the flow rate, or turn it off, a Readable stream provides data in chunks that we can consume, pause, resume, or end.
The glorious water tap: our mental starting point for Readable streams
But how can we create a Readable stream in Node.js? There are a number of ways, but let's start with what the node:stream
module provides.
- stream.Readable.from()
- stream.from()
- stream.Readable class
- stream.fromWeb()
From top-to-bottom, I've listed out what I think are the most approachable ways to create a Readable stream in Node.js from the node:stream
module, so let's go through them each.
stream.Readable.from()
The stream.Readable.from()
method creates a Readable stream from an iterable object or an async iterable object. This method is useful when you have an iterable data source, for example an array of strings, that you want to convert into a stream.
const { Readable } = require("stream"); const readableStream = Readable.from(["Hello", " ", "World", "!"]);
In the above example, we create a Readable stream from an array of strings. The Readable.from()
method automatically converts the array into a stream that emits each string as a chunk of data.
For the sake of completion, here is a reminder of how that Readable stream could be paired with a Writable stream to pipe data from one to the other:
const { Readable, Writable } = require("node:stream"); const fs = require("node:fs"); // Create a Readable stream from an array of strings const readableStream = Readable.from(["Hello", " ", "World", "!"]); // Create a Writable stream (in this case, writing to a file) const writableStream = fs.createWriteStream("output.txt"); // Pipe the Readable stream to the Writable stream readableStream.pipe(writableStream); // Handle the 'finish' event on the Writable stream writableStream.on("finish", () => { console.log("Finished writing to file"); }); // Handle any errors writableStream.on("error", (err) => { console.error("An error occurred:", err); });
In the above scenario, our readableStream
is piped to the writeableStream
that writes to a file output.txt
.
The array of ["Hello", " ", "World", "!"]
here is considered an iterable object with four chunks. Each element of the array is treated as a separate chunk in the resulting stream.
The writeableStream
consumes each chunk of data and writes it to the file output.txt
. The finish
event is emitted when all data has been written to the file, and the error
event is emitted if an error occurs during the writing process.
To solidify this, here is a verbose sequence diagram:
An overview of how the above code is processed chunk-by-chunk
stream.from()
This is function that can be imported directly from the node:stream
module and is a shorthand for stream.Readable.from()
.
const { from, Readable } = require("node:stream"); // These two lines are equivalent: const stream1 = from(["Hello", "World"]); const stream2 = Readable.from(["Hello", "World"]);
stream.Readable()
The stream.Readable
class is a base class for creating custom Readable streams. You can extend this class to create your own Readable streams by implementing the _read()
method.
const { Readable } = require("stream"); class MyReadable extends Readable { constructor(options, data) { super(options); this.data = data; } _read() { if (this.data.length) { this.push(this.data.shift()); } else { this.push(null); } } } const myReadable = new MyReadable(["Hello", " ", "World", "!"]);
In the above example, we create a custom Readable stream by extending the stream.Readable
class. The _read()
method is implemented to push data chunks from the data
array to the stream. When all data has been pushed, null
is pushed to signal the end of the stream.
stream.fromWeb()
stream.fromWeb()
is designed to convert a Web API ReadableStream into a Node.js Readable stream. It's not always "from the web" in the sense of network requests; it's about compatibility between Web API streams and Node.js streams.
const { fromWeb } = require("node:stream/web"); // Create a Web API ReadableStream (this could come from a fetch() response, for example) const webReadableStream = new ReadableStream({ start(controller) { controller.enqueue("Hello"); controller.enqueue("World"); controller.close(); }, }); // Convert it to a Node.js Readable stream const nodeReadableStream = fromWeb(webReadableStream); // Use the Node.js stream nodeReadableStream.on("data", (chunk) => { console.log(chunk.toString()); }); nodeReadableStream.on("end", () => { console.log("Stream ended"); });
In the above example, we create a Web API ReadableStream
that emits two chunks of data ("Hello" and "World") and then closes. We then convert this Web API stream to a Node.js Readable stream using stream.fromWeb()
.
For advanced more practical example using the web fetch
API:
const { fromWeb } = require("node:stream/web"); const { createWriteStream } = require("node:fs"); const { pipeline } = require("node:stream/promises"); async function downloadFile(url, outputPath) { const response = await fetch(url); if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`); // response.body is a Web API ReadableStream const nodeStream = fromWeb(response.body); // Create a writable stream to the output file const fileStream = createWriteStream(outputPath); // Use pipeline to pipe the data and handle cleanup await pipeline(nodeStream, fileStream); console.log(`File downloaded to ${outputPath}`); } downloadFile("https://example.com/somefile.txt", "output.txt").catch( console.error );
In the above, we use the fetch
API to download a file from a URL. The response body is a Web API ReadableStream
, which we convert to a Node.js Readable stream using stream.fromWeb()
. We then pipe this stream to a Writable stream that writes the data to a file.
Although we haven't touched on it, we make use of the
pipeline
API fromnode:stream
in the above example.
A visual look at the download sequence:
A sequence diagram of how the above downloadFile function works
Other ways to create a Readable stream
process.stdin
process.stdin
provides a built-in Readable stream for standard input that you can hook into thanks streams implementing the EventEmitter class:
process.stdin.on("data", (chunk) => { console.log("Received input:", chunk.toString()); });
http.IncomingMessage
Another built-in Readable stream created when receiving an HTTP request from Node.js' node:http
module:
const http = require("node:http"); const server = http.createServer((req, res) => { // req is an instance of http.IncomingMessage req.on("data", (chunk) => { console.log("Received data:", chunk.toString()); }); }); server.listen(3000);
child_process.spawn()
child_process.spawn()
creates a child process that provides Readable streams for standard output and standard error:
const { spawn } = require("node:child_process"); const child = spawn("ls", ["-l"]); child.stdout.on("data", (data) => { console.log(`stdout: ${data}`); });
Compression
Built-in modules like zlib
provide Readable streams for compression and decompression:
const zlib = require("node:zlib"); const fs = require("node:fs"); const readStream = fs.createReadStream("file.txt"); const gunzip = zlib.createGunzip(); const uncompressedStream = readStream.pipe(gunzip);
Encryption
Finally for our last example, we can use the crypto
module to create streams for cryptographic operations:
const crypto = require("node:crypto"); const key = crypto.randomBytes(32); const iv = crypto.randomBytes(16); const decipher = crypto.createDecipheriv("aes-256-cbc", key, iv); // Use decipher as a Readable stream
For a more complete example for encryption:
const crypto = require("node:crypto"); const { pipeline } = require("node:stream/promises"); const fs = require("node:fs"); // Generate a random key and IV const key = crypto.randomBytes(32); const iv = crypto.randomBytes(16); // Create some sample encrypted data const algorithm = "aes-256-cbc"; const cipher = crypto.createCipheriv(algorithm, key, iv); let encrypted = cipher.update("This is a secret message", "utf8", "hex"); encrypted += cipher.final("hex"); // Write the encrypted data to a file fs.writeFileSync("encrypted.txt", encrypted); // Create a decipher (Readable stream) const decipher = crypto.createDecipheriv(algorithm, key, iv); // Create a Readable stream from the encrypted file const readStream = fs.createReadStream("encrypted.txt", { encoding: "hex" }); // Create a Writable stream for the decrypted output const writeStream = fs.createWriteStream("decrypted.txt"); // Use the pipeline to decrypt the data async function decryptFile() { try { await pipeline(readStream, decipher, writeStream); console.log("File successfully decrypted"); } catch (err) { console.error("Pipeline failed", err); } } decryptFile();
This last example is a little tough to follow (I've synchronously written the encrypted data to a file), but here is a sequence diagram to help:
A sequence diagram of how the above code executes
Readable stream fundamentals in action
We're going to cap off this blog post by recapping some of the core stream concepts we covered in the previous post Understanding Node.js Streams.
The following will be a quick-fire round of examples to help you get comfortable with the concepts we have covered.
const { Readable } = require("node:stream"); // Create a custom Readable stream class NumberStream extends Readable { constructor(max, options) { super(options); this.max = max; this.current = 1; } _read() { if (this.current <= this.max) { const data = { number: this.current }; this.push(JSON.stringify(data)); this.current++; } else { this.push(null); // Signal the end of the stream } } } // Create instances of our custom stream const numberStream = new NumberStream(5, { objectMode: false, highWaterMark: 64, // Buffer size }); const objectStream = new NumberStream(5, { objectMode: true, highWaterMark: 2, // Buffer size }); // Example 1: Using events (paused mode) numberStream.on("readable", () => { let chunk; while (null !== (chunk = numberStream.read())) { console.log("Received chunk:", chunk.toString()); } }); numberStream.on("end", () => { console.log("Stream ended"); }); // Example 2: Using flowing mode objectStream.on("data", (chunk) => { console.log("Received object:", chunk); }); objectStream.on("end", () => { console.log("Object stream ended"); }); // Example 3: Demonstrating backpressure const slowConsumer = new Readable({ read() {}, }); slowConsumer.on("data", (chunk) => { console.log("Slow consumer received:", chunk.toString()); }); const fastProducer = new Readable({ read() { for (let i = 0; i < 1000; i++) { if (!this.push(`Data ${i}\n`)) { console.log("Backpressure applied"); return; } } this.push(null); }, }); // Piping with backpressure handling fastProducer.pipe(slowConsumer);
In the above example, we cover the following concepts:
Buffering: The
highWaterMark
option is set in theNumberStream
constructor, controlling the internal buffer size.Backpressure: In Example 3, we demonstrate backpressure between a fast producer and a slow consumer.
Modes of Operation:
- Example 1 uses the paused mode, manually calling
read()
. - Example 2 uses the flowing mode with the 'data' event.
- Example 1 uses the paused mode, manually calling
Piping: Example 3 uses
pipe()
to connect the fast producer to the slow consumer.Events: We use
readable
,data
, andend
events to interact with the streams.Object Mode: The
objectStream
is created withobjectMode: true
, allowing it to emit JavaScript objects.
On top of this:
- The
NumberStream
class demonstrates how to create a custom Readable stream. - The
_read
method is called internally by the stream to generate data. - In object mode, we directly push JavaScript objects. In non-object mode, we
stringify
the objects. - The backpressure example shows how a fast producer will pause when the consumer's buffer fills up.
- Piping automatically handles backpressure and data flow between streams.
Conclusion
Readable streams in Node.js offer a powerful and flexible way to handle data in chunks, making it easier to process large datasets efficiently. By understanding the various methods to create readable streams, such as stream.Readable.from()
, stream.from()
, stream.Readable
, and stream.fromWeb()
, you can leverage these tools to build robust data processing applications.
We also explored several practical examples, including piping streams, handling HTTP requests, working with child processes, and performing compression and encryption. Each of these examples highlights the versatility of readable streams and their applicability in different scenarios.
As you continue to work with Node.js streams, remember the key concepts of buffering, backpressure, and the importance of event handling to manage your streams effectively. It all takes practice.
For further reading and more advanced use cases, be sure to check out the official Node.js documentation and other related resources. We'll be looking into Writeable streams next time!
Resources and further reading
Disclaimer: This blog post used AI to generate the images used for the analogy.
Photo credit: terminath0r
Readable Streams in Node.js
Introduction