Writable Streams in Node.js
Published: Jul 10, 2024
Last updated: Jul 10, 2024
Overview
Previously, we covered everything we need to know to understand Node.js streams, followed by a look into Readable streams.
In this post, we move onto Writable streams."
How to think about Writable streams
Previously, we started that a mental model for remembering Writable streams is to compare them to a water barrel (or a water bucket if that works better for you).
The happy-go-lucky water barrel: our mental starting point for Readable streams
Here is a little more depth to the analogy and things that you need to know:
- Bucket (Stream Buffer): The bucket represents the internal buffer of a Writable stream. It has a finite capacity to hold water (data).
- Pouring Water (Writing Data): When you write data to a stream, it's like pouring water into the bucket. The
write()
method is analogous to pouring water. - Bucket Capacity (
highWaterMark
): The bucket has a maximum capacity, similar to thehighWaterMark
in Writable streams. This is the point at which the bucket is considered "full". - Overflow (Backpressure): If you try to pour more water when the bucket is already full, it will overflow. In streams, this triggers backpressure, and the
write()
method returns false. - Draining the Bucket (Processing Data): As the stream processes data, it's like water draining from the bucket. When enough water has drained (data processed), the stream emits a 'drain' event.
- Continuous Flow (Flowing Mode): Imagine a system where water is constantly poured in and drained out. This is similar to a stream in flowing mode, where data is written and processed continuously.
- Pausing the Flow (Paused Mode): If you pause the water flow, it's like pausing a stream. The
pause()
method is used to pause the stream. This manual process is useful when you need to control the flow of data. - Sealing the bucket (Ending the Stream): In a Writable stream, calling
end()
is like saying "This is the last bit of data I'm adding to the bucket, and once it's processed, consider the operation complete." It's more about finalizing the writing process than "closing a source". - Bucket Emptied (Finish Event): When all the water has drained from the bucket, it's empty. This is analogous to the 'finish' event in a Writable stream, indicating all data has been processed.
- Leaks (Error Handling): This is more for a mental model, but if there's a leak in the bucket, it's like an error in the stream. Proper error handling is crucial in both cases.
- Custom Buckets (Custom Writable Streams): Just as you might have specialized buckets for different liquids, you can create custom Writable streams for specific data types or operations. This is done by extending a class from
stream.Writable
. See the section onstream.Writable
class for more clarification.
Here is a lifecycle sequence diagram to help you visualize the analogy:
Our water bucket/barrel analogy in action
How to create Writable streams
The node:stream
module provides a Writable
class which you use to create Writable
streams.
- Extending the
stream.Writable
class - Creating an instance using the
stream.Writable
class - Using a factory method from
stream.Writable
Extending the stream.Writable
class
In this scenario, you create your own class and provide a _write
method for handling the data processing.
const { Writable } = require("node:stream"); class MyWritable extends Writable { _write(chunk, encoding, callback) { // Implementation } }
Creating an instance using the stream.Writable
class
Alternative, we can just create an instance of a Writable
stream directly.
This is similar to extending a class, but we make our implementation for processing data directly in the write
method.
const { Writable } = require("node:stream"); const writable = new Writable({ write(chunk, encoding, callback) { // Implementation }, });
For example, we could create a LogWriter
class that logs the chunks as it processes data:
const { Writable } = require("node:stream"); class LogWriter extends Writable { _write(chunk, encoding, callback) { console.log(`[${new Date().toISOString()}] ${chunk.toString()}`); callback(); } } const logger = new LogWriter(); logger.write("Application started"); logger.write("User logged in"); logger.end("Application closed");
Using a factory method from stream.Writable
const stream = require("node:stream"); const writable = stream.Writable({ write(chunk, encoding, callback) { // Implementation }, });
Other examples of Writable streams
In addition to using the stream.Writable
class, some Node.js modules provide several built-in Writable
streams for common use cases:
fs.createWriteStream
http
response objectsprocess.stdout
andprocess.stderr
child_process
stdin- Compression streams
- Crypto streams
- Database write streams (e.g. MongoDB)
fs.createWriteStream
With fs.createWriteStream
, you can write data to a file using a Writable
stream.
const fs = require("node:fs"); const writableStream = fs.createWriteStream("output.txt");
For example, we could create a log file:
const fs = require("node:fs"); const writableStream = fs.createWriteStream("access.log", { flags: "a" }); function logAccess(ip, route) { writableStream.write(`${new Date().toISOString()} ${ip} ${route}\n`); } logAccess("192.168.1.1", "/home"); logAccess("192.168.1.2", "/login");
http response objects
The res
or "response" argument in an HTTP server callback is a Writable
stream.
const http = require("node:http"); const server = http.createServer((req, res) => { // res is a writable stream res.write("Hello"); res.end("World"); });
We could use it for streaming back simple responses to the client:
const http = require("node:http"); const server = http.createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/html" }); res.write("<html><body>"); res.write("<h1>Welcome to my server!</h1>"); res.write(`<p>You requested: ${req.url}</p>`); res.end("</body></html>"); }); server.listen(3000, () => console.log("Server running on port 3000"));
In the above example, we are streaming an HTML response to the client.
process.stdout and process.stderr
Both process.stdout
and process.stderr
are Writable
streams that write to the console.
process.stdout.write("Log to console\n"); process.stderr.write("Log error to console\n");
child_process stdin
child_process
provides a way to spawn child processes and interact with them using stdin
, stdout
and stderr
.
const { spawn } = require("node:child_process"); const child = spawn("wc"); // child.stdin is a writable stream child.stdin.write("Hello World"); child.stdin.end();
For example, we could use child_process
to spawn a grep
process that searches for the word "hello" in the input stream and logs the output to the console:
const { spawn } = require("node:child_process"); const child = spawn("grep", ["hello"]); child.stdout.on("data", (data) => { console.log(`grep found: ${data}`); }); child.stdin.write("hello world\n"); child.stdin.write("goodbye world\n"); child.stdin.end();
Compression streams
Although we haven't covered Transform
streams in-depth yet, if you've followed along with the series so far then it will be come trivial to understand how they work.
In the case of zlib
, we can create a gzip
compression Transform
stream that is both readable and writable:
const zlib = require("node:zlib"); const gzip = zlib.createGzip(); // gzip is a transform stream (both readable and writable) - we cover this more in a future post
An example of this in action where we compress a file using gzip compression and write it out to a new file:
const fs = require("node:fs"); const zlib = require("node:zlib"); const readStream = fs.createReadStream("input.txt"); const writeStream = fs.createWriteStream("input.txt.gz"); const gzip = zlib.createGzip(); readStream.pipe(gzip).pipe(writeStream); writeStream.on("finish", () => console.log("File successfully compressed"));
Crypto streams
Similar to above, cipher
in our below example is a Transform
stream that is both readable and writable.
const crypto = require("crypto"); const cipher = crypto.createCipher("aes192", "secret"); // cipher is a transform stream (both readable and writable) - we cover this more in a future post
An example of this in action where we encrypt a file using AES-192-CBC encryption and write it out to a new file:
const crypto = require("node:crypto"); const fs = require("node:fs"); const algorithm = "aes-192-cbc"; const password = "Password used to generate key"; const key = crypto.scryptSync(password, "salt", 24); const iv = Buffer.alloc(16, 0); const cipher = crypto.createCipheriv(algorithm, key, iv); const input = fs.createReadStream("input.txt"); const output = fs.createWriteStream("encrypted.txt"); input.pipe(cipher).pipe(output); output.on("finish", () => console.log("File encrypted successfully"));
Database write streams
Finally, some databases provide Writable
streams for writing data. For example, MongoDB provides a GridFSBucket
class for working with large files.
const mongodb = require("mongodb"); const bucket = new mongodb.GridFSBucket(db); const uploadStream = bucket.openUploadStream("file.txt");
Here is a more complete example:
const { MongoClient } = require("mongodb"); const fs = require("node:fs"); async function uploadFile(filePath, fileName) { const client = await MongoClient.connect("mongodb://localhost:27017"); const db = client.db("myDatabase"); const bucket = new MongoClient.GridFSBucket(db); const uploadStream = bucket.openUploadStream(fileName); const readStream = fs.createReadStream(filePath); readStream.pipe(uploadStream); return new Promise((resolve, reject) => { uploadStream.on("finish", resolve); uploadStream.on("error", reject); }); } uploadFile("largefile.zip", "backup.zip") .then(() => console.log("File uploaded successfully")) .catch(console.error);
You can also write streams to remote sources like S3 buckets, etc.
Writable stream fundamentals in action
To cap off today's blog post, let's take a look at a Writable stream that covers the fundamentals that we have covered in the series previously (backpressure, buffering etc.).
const { Writable } = require("stream"); const fs = require("fs"); class SlowWriter extends Writable { constructor(options) { // 1. Stream types: This is a Writable stream // 7. Object Mode: We're enabling object mode super({ ...options, objectMode: true }); // 2. Buffering: We're setting a custom highWaterMark this.highWaterMark = options.highWaterMark || 2; this.outputFile = fs.createWriteStream("output.txt", { flags: "a" }); } _write(chunk, encoding, callback) { // Simulate slow writing setTimeout(() => { // 7. Object Mode: We can write objects directly const output = `${JSON.stringify(chunk)}\n`; // 3. Backpressure: This slow write operation may cause backpressure this.outputFile.write(output, (err) => { if (err) { callback(err); } else { console.log("Chunk written"); callback(); } }); }, 1000); } _final(callback) { this.outputFile.end(callback); } } // Usage: const slowWriter = new SlowWriter({ highWaterMark: 2 }); // 6. Events: Listening for events slowWriter.on("finish", () => console.log("All writes are complete.")); slowWriter.on("drain", () => console.log("Stream is no longer full, can resume writing.") ); // 5. Piping: Creating a readable stream to pipe into our writable const { Readable } = require("stream"); const fastReader = new Readable({ objectMode: true, read() {}, }); // 4. Modes of Operation: This is in flowing mode due to the pipe fastReader.pipe(slowWriter); // Simulate fast data production for (let i = 0; i < 10; i++) { // 3. Backpressure: This may trigger backpressure due to slow writing const canContinue = fastReader.push({ data: `Fast data ${i}` }); if (!canContinue) { console.log("Backpressure detected, waiting for drain"); // In a real scenario, you'd wait for the 'drain' event before continuing } } fastReader.push(null); // Signal the end of the readable stream
In the above example, we demonstrate:
- Stream types: It creates a custom Writable stream (SlowWriter) and uses a Readable stream (fastReader).
- Buffering: The
highWaterMark
is set to control the internal buffer size. - Backpressure: The slow writing operation simulated by
setTimeout
may cause backpressure, which is handled by the stream automatically. - Modes of Operation: The readable stream is put into flowing mode when piped to the writable stream.
- Piping: The readable stream is piped to the writable stream, automatically managing data flow.
- Events: The code listens for 'finish' and 'drain' events on the writable stream.
- Object Mode: Both the readable and writable streams are set to object mode, allowing them to work with JavaScript objects.
For a visual guide on the above:
A sequence diagram of our SlowWriter in action
Conclusion
Understanding Writable streams in Node.js is crucial for managing data flow in your applications. By drawing parallels to everyday concepts like water barrels and buckets, we've explored how Writable streams handle data, manage backpressure, and process data efficiently. We've also demonstrated various ways to implement and utilize Writable streams, from logging and file writing to more advanced use cases like compression and encryption.
As you continue your journey with Node.js streams, remember that mastering these concepts can greatly enhance your ability to build robust, high-performance applications. Whether you're handling large file uploads, streaming data to a client, or processing logs, Writable streams offer a powerful toolset for efficient data handling. Keep experimenting with different stream types and explore the endless possibilities they provide.
Thank you for following along with this series on Node.js streams. Stay tuned for our next post, where we'll dive into Transform streams and further expand our understanding of Node.js stream fundamentals.
Resources and further reading
Disclaimer: This blog post used AI to generate the images used for the analogy.
Photo credit: robanderson72
Writable Streams in Node.js
Introduction