Writable Streams in Node.js

Published: Jul 10, 2024

Last updated: Jul 10, 2024

Overview

Previously, we covered everything we need to know to understand Node.js streams, followed by a look into Readable streams.

In this post, we move onto Writable streams."

How to think about Writable streams

Previously, we started that a mental model for remembering Writable streams is to compare them to a water barrel (or a water bucket if that works better for you).

The happy-go-lucky water barrel: our mental starting point for Readable streams

The happy-go-lucky water barrel: our mental starting point for Readable streams

Here is a little more depth to the analogy and things that you need to know:

  • Bucket (Stream Buffer): The bucket represents the internal buffer of a Writable stream. It has a finite capacity to hold water (data).
  • Pouring Water (Writing Data): When you write data to a stream, it's like pouring water into the bucket. The write() method is analogous to pouring water.
  • Bucket Capacity (highWaterMark): The bucket has a maximum capacity, similar to the highWaterMark in Writable streams. This is the point at which the bucket is considered "full".
  • Overflow (Backpressure): If you try to pour more water when the bucket is already full, it will overflow. In streams, this triggers backpressure, and the write() method returns false.
  • Draining the Bucket (Processing Data): As the stream processes data, it's like water draining from the bucket. When enough water has drained (data processed), the stream emits a 'drain' event.
  • Continuous Flow (Flowing Mode): Imagine a system where water is constantly poured in and drained out. This is similar to a stream in flowing mode, where data is written and processed continuously.
  • Pausing the Flow (Paused Mode): If you pause the water flow, it's like pausing a stream. The pause() method is used to pause the stream. This manual process is useful when you need to control the flow of data.
  • Sealing the bucket (Ending the Stream): In a Writable stream, calling end() is like saying "This is the last bit of data I'm adding to the bucket, and once it's processed, consider the operation complete." It's more about finalizing the writing process than "closing a source".
  • Bucket Emptied (Finish Event): When all the water has drained from the bucket, it's empty. This is analogous to the 'finish' event in a Writable stream, indicating all data has been processed.
  • Leaks (Error Handling): This is more for a mental model, but if there's a leak in the bucket, it's like an error in the stream. Proper error handling is crucial in both cases.
  • Custom Buckets (Custom Writable Streams): Just as you might have specialized buckets for different liquids, you can create custom Writable streams for specific data types or operations. This is done by extending a class from stream.Writable. See the section on stream.Writable class for more clarification.

Here is a lifecycle sequence diagram to help you visualize the analogy:

Our water bucket/barrel analogy in action

Our water bucket/barrel analogy in action

How to create Writable streams

The node:stream module provides a Writable class which you use to create Writable streams.

  1. Extending the stream.Writable class
  2. Creating an instance using the stream.Writable class
  3. Using a factory method from stream.Writable

Extending the stream.Writable class

In this scenario, you create your own class and provide a _write method for handling the data processing.

const { Writable } = require("node:stream"); class MyWritable extends Writable { _write(chunk, encoding, callback) { // Implementation } }

Creating an instance using the stream.Writable class

Alternative, we can just create an instance of a Writable stream directly.

This is similar to extending a class, but we make our implementation for processing data directly in the write method.

const { Writable } = require("node:stream"); const writable = new Writable({ write(chunk, encoding, callback) { // Implementation }, });

For example, we could create a LogWriter class that logs the chunks as it processes data:

const { Writable } = require("node:stream"); class LogWriter extends Writable { _write(chunk, encoding, callback) { console.log(`[${new Date().toISOString()}] ${chunk.toString()}`); callback(); } } const logger = new LogWriter(); logger.write("Application started"); logger.write("User logged in"); logger.end("Application closed");

Using a factory method from stream.Writable

const stream = require("node:stream"); const writable = stream.Writable({ write(chunk, encoding, callback) { // Implementation }, });

Other examples of Writable streams

In addition to using the stream.Writable class, some Node.js modules provide several built-in Writable streams for common use cases:

  1. fs.createWriteStream
  2. http response objects
  3. process.stdout and process.stderr
  4. child_process stdin
  5. Compression streams
  6. Crypto streams
  7. Database write streams (e.g. MongoDB)

fs.createWriteStream

With fs.createWriteStream, you can write data to a file using a Writable stream.

const fs = require("node:fs"); const writableStream = fs.createWriteStream("output.txt");

For example, we could create a log file:

const fs = require("node:fs"); const writableStream = fs.createWriteStream("access.log", { flags: "a" }); function logAccess(ip, route) { writableStream.write(`${new Date().toISOString()} ${ip} ${route}\n`); } logAccess("192.168.1.1", "/home"); logAccess("192.168.1.2", "/login");

http response objects

The res or "response" argument in an HTTP server callback is a Writable stream.

const http = require("node:http"); const server = http.createServer((req, res) => { // res is a writable stream res.write("Hello"); res.end("World"); });

We could use it for streaming back simple responses to the client:

const http = require("node:http"); const server = http.createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/html" }); res.write("<html><body>"); res.write("<h1>Welcome to my server!</h1>"); res.write(`<p>You requested: ${req.url}</p>`); res.end("</body></html>"); }); server.listen(3000, () => console.log("Server running on port 3000"));

In the above example, we are streaming an HTML response to the client.

process.stdout and process.stderr

Both process.stdout and process.stderr are Writable streams that write to the console.

process.stdout.write("Log to console\n"); process.stderr.write("Log error to console\n");

child_process stdin

child_process provides a way to spawn child processes and interact with them using stdin, stdout and stderr.

const { spawn } = require("node:child_process"); const child = spawn("wc"); // child.stdin is a writable stream child.stdin.write("Hello World"); child.stdin.end();

For example, we could use child_process to spawn a grep process that searches for the word "hello" in the input stream and logs the output to the console:

const { spawn } = require("node:child_process"); const child = spawn("grep", ["hello"]); child.stdout.on("data", (data) => { console.log(`grep found: ${data}`); }); child.stdin.write("hello world\n"); child.stdin.write("goodbye world\n"); child.stdin.end();

Compression streams

Although we haven't covered Transform streams in-depth yet, if you've followed along with the series so far then it will be come trivial to understand how they work.

In the case of zlib, we can create a gzip compression Transform stream that is both readable and writable:

const zlib = require("node:zlib"); const gzip = zlib.createGzip(); // gzip is a transform stream (both readable and writable) - we cover this more in a future post

An example of this in action where we compress a file using gzip compression and write it out to a new file:

const fs = require("node:fs"); const zlib = require("node:zlib"); const readStream = fs.createReadStream("input.txt"); const writeStream = fs.createWriteStream("input.txt.gz"); const gzip = zlib.createGzip(); readStream.pipe(gzip).pipe(writeStream); writeStream.on("finish", () => console.log("File successfully compressed"));

Crypto streams

Similar to above, cipher in our below example is a Transform stream that is both readable and writable.

const crypto = require("crypto"); const cipher = crypto.createCipher("aes192", "secret"); // cipher is a transform stream (both readable and writable) - we cover this more in a future post

An example of this in action where we encrypt a file using AES-192-CBC encryption and write it out to a new file:

const crypto = require("node:crypto"); const fs = require("node:fs"); const algorithm = "aes-192-cbc"; const password = "Password used to generate key"; const key = crypto.scryptSync(password, "salt", 24); const iv = Buffer.alloc(16, 0); const cipher = crypto.createCipheriv(algorithm, key, iv); const input = fs.createReadStream("input.txt"); const output = fs.createWriteStream("encrypted.txt"); input.pipe(cipher).pipe(output); output.on("finish", () => console.log("File encrypted successfully"));

Database write streams

Finally, some databases provide Writable streams for writing data. For example, MongoDB provides a GridFSBucket class for working with large files.

const mongodb = require("mongodb"); const bucket = new mongodb.GridFSBucket(db); const uploadStream = bucket.openUploadStream("file.txt");

Here is a more complete example:

const { MongoClient } = require("mongodb"); const fs = require("node:fs"); async function uploadFile(filePath, fileName) { const client = await MongoClient.connect("mongodb://localhost:27017"); const db = client.db("myDatabase"); const bucket = new MongoClient.GridFSBucket(db); const uploadStream = bucket.openUploadStream(fileName); const readStream = fs.createReadStream(filePath); readStream.pipe(uploadStream); return new Promise((resolve, reject) => { uploadStream.on("finish", resolve); uploadStream.on("error", reject); }); } uploadFile("largefile.zip", "backup.zip") .then(() => console.log("File uploaded successfully")) .catch(console.error);

You can also write streams to remote sources like S3 buckets, etc.

Writable stream fundamentals in action

To cap off today's blog post, let's take a look at a Writable stream that covers the fundamentals that we have covered in the series previously (backpressure, buffering etc.).

const { Writable } = require("stream"); const fs = require("fs"); class SlowWriter extends Writable { constructor(options) { // 1. Stream types: This is a Writable stream // 7. Object Mode: We're enabling object mode super({ ...options, objectMode: true }); // 2. Buffering: We're setting a custom highWaterMark this.highWaterMark = options.highWaterMark || 2; this.outputFile = fs.createWriteStream("output.txt", { flags: "a" }); } _write(chunk, encoding, callback) { // Simulate slow writing setTimeout(() => { // 7. Object Mode: We can write objects directly const output = `${JSON.stringify(chunk)}\n`; // 3. Backpressure: This slow write operation may cause backpressure this.outputFile.write(output, (err) => { if (err) { callback(err); } else { console.log("Chunk written"); callback(); } }); }, 1000); } _final(callback) { this.outputFile.end(callback); } } // Usage: const slowWriter = new SlowWriter({ highWaterMark: 2 }); // 6. Events: Listening for events slowWriter.on("finish", () => console.log("All writes are complete.")); slowWriter.on("drain", () => console.log("Stream is no longer full, can resume writing.") ); // 5. Piping: Creating a readable stream to pipe into our writable const { Readable } = require("stream"); const fastReader = new Readable({ objectMode: true, read() {}, }); // 4. Modes of Operation: This is in flowing mode due to the pipe fastReader.pipe(slowWriter); // Simulate fast data production for (let i = 0; i < 10; i++) { // 3. Backpressure: This may trigger backpressure due to slow writing const canContinue = fastReader.push({ data: `Fast data ${i}` }); if (!canContinue) { console.log("Backpressure detected, waiting for drain"); // In a real scenario, you'd wait for the 'drain' event before continuing } } fastReader.push(null); // Signal the end of the readable stream

In the above example, we demonstrate:

  1. Stream types: It creates a custom Writable stream (SlowWriter) and uses a Readable stream (fastReader).
  2. Buffering: The highWaterMark is set to control the internal buffer size.
  3. Backpressure: The slow writing operation simulated by setTimeout may cause backpressure, which is handled by the stream automatically.
  4. Modes of Operation: The readable stream is put into flowing mode when piped to the writable stream.
  5. Piping: The readable stream is piped to the writable stream, automatically managing data flow.
  6. Events: The code listens for 'finish' and 'drain' events on the writable stream.
  7. Object Mode: Both the readable and writable streams are set to object mode, allowing them to work with JavaScript objects.

For a visual guide on the above:

A sequence diagram of our SlowWriter in action

A sequence diagram of our SlowWriter in action

Conclusion

Understanding Writable streams in Node.js is crucial for managing data flow in your applications. By drawing parallels to everyday concepts like water barrels and buckets, we've explored how Writable streams handle data, manage backpressure, and process data efficiently. We've also demonstrated various ways to implement and utilize Writable streams, from logging and file writing to more advanced use cases like compression and encryption.

As you continue your journey with Node.js streams, remember that mastering these concepts can greatly enhance your ability to build robust, high-performance applications. Whether you're handling large file uploads, streaming data to a client, or processing logs, Writable streams offer a powerful toolset for efficient data handling. Keep experimenting with different stream types and explore the endless possibilities they provide.

Thank you for following along with this series on Node.js streams. Stay tuned for our next post, where we'll dive into Transform streams and further expand our understanding of Node.js stream fundamentals.

Resources and further reading

Disclaimer: This blog post used AI to generate the images used for the analogy.

Photo credit: robanderson72

Personal image

Dennis O'Keeffe

Byron Bay, Australia

Dennis O'Keeffe

2020-present Dennis O'Keeffe.

All Rights Reserved.