Node.js Streams in Action with the AWS CDK

Published: Jul 14, 2024

Last updated: Jul 14, 2024

Overview

We've come towards the end of the Node.js Streams blog series. Everything we've covered so far will take you a long way in understanding how to work with streams in Node.js.

In this post, we're going to introduce the PassThrough stream type and then take everything that we've learned so far and apply it through a practical (but somewhat contrived) example using the AWS CDK, AWS S3 and AWS Transfer Family.

If you're here because you are most interested in configuring AWS Transfer Family, then jump down to the project implementation.

What are PassThrough streams?

Previously, we covered the four fundamental stream types:

So how does stream.PassThrough fit into the picture?

A PassThrough stream in Node.js is a special type of Transform stream that simply passes data through from its writable side to its readable side without modifying it. It's essentially a "do-nothing" transform stream that can be useful in various scenarios.

In general, Node.js docs describe the PassThrough stream's primary purpose as being for examples and testing, but there are also some novel use cases for PassThrough streams.

Some of these novel uses of a PassThrough stream are:

  1. To provide a placeholder for a transform operation that may be added later.
  2. To delay the piping of data from one stream to another.
  3. To observe data passing through a pipeline without altering it.

(3) is particularly useful when you are tying together third-party stream types that you want to pipe through pipelines or when demultiplexing streams. We will be using the PassThrough stream later in our example to do just that.

For clarity, it's not necessarily a tool that you'll use everyday, but I have found some use cases.

Full disclosure: I have found some use cases, but maybe you should scrutinize my use case. There might be a better way to do it.

How to think about PassThrough streams

With our four fundamental data types, we've used water-related analogies to help understand how streams work in Node.js.

  1. Readable streams: Think of a water tap.
  2. Writable streams: Think of a water bucket.
  3. Duplex streams: A two-way water pipe (with valves to represent the read/write elements).
  4. Transform streams: A pipe going through a water treatment plant.

So how do we think about PassThrough streams?

We can think of them as a clear, transparent water pipe. The water flows from one end to another without any modification.

We think about Node.js PassThrough streams as a clear, transparent water pipe

We think about Node.js PassThrough streams as a clear, transparent water pipe

To push forward the idea of this analogy:

  • Like a PassThrough stream, a transparent pipe section allows water to flow through without changing it.
  • You can observe the water (data) as it passes through.
  • It can be easily inserted into an existing pipeline without affecting the flow.
  • It provides a point where you could potentially add a "filter" or other "treatment" later if needed.

With that out of the way, let's move on onto our project.

Uploading and downloading a CSV file to AWS S3 using the AWS CDK

Our project today is to create an "upload" and "download" script to exemplify some of the concepts we've learned throughout the series.

Here is the overview of what we are building:

  • We will be using the AWS CDK to create an S3 bucket and an AWS Transfer Family server.
  • We will create a special Duplex stream that will act as a demultiplexer.
  • Our upload script will have three pipelines: one to generate a CSV stream from an array of objects which is compressed and then demultiplexed, while the other two will be writable from our demux. One will upload the compressed CSV file to S3, and the other will SFTP the CSV file to our Transfer Family Server (and end up in the same bucket).
  • Our download script will have four pipelines: one to download our S3 upload file, decompress it and then demultiplex it. The two receivers from the demux will write our the CSV file and and YAML file to disk (just for fun). The fourth pipeline will download the compressed CSV file via SFTP from our Transfer Family Server, decompress it then write it to disk.

You can see the full project code on my GitHub repository.

Prerequisites

It is expected that you are familiar with the following (at least to some extent):

Setting up the project

First, we need to initialize our CDK TypeScript project. We can do that by running the following commands:

$ mkdir demo-stream-to-s3-and-back $ cd demo-stream-to-s3-and-back $ npx cdk init app --language typescript

Once the project is initialized, we can install the necessary dependencies for our other scripts:

$ npm i @aws-sdk/client-s3 @aws-sdk/lib-storage fast-csv ssh2-sftp-client yaml $ npm i --save-dev tsx

Finally, let's configure some files for our project:

$ mkdir src $ touch src/upload.ts src/download.ts src/

Creating an asymmetric SSH key pair

First step is for us to create an asymmetric SSH key pair. We will use the public key for our SFTP server.

From the root directory of our project, run the following:

$ ssh-keygen -t rsa -b 2048 -f sftp_key

Running this code will generate two files: sftp_key and sftp_key.pub. We will use the public key in our CDK code, while the private key will need to be referenced in our local code.

If you are committing you own work, make sure to git ignore these files.

Creating the S3 bucket and Transfer Family server

Next, let's write our AWS CDK infrastructure code.

If you followed my naming convention above and called the project directory demo-stream-to-s3-and-back, then you will have a file lib/demo-stream-to-s3-and-back-stack.ts already created for you.

Update that file with the following:

import * as cdk from "aws-cdk-lib"; import { Construct } from "constructs"; import * as s3 from "aws-cdk-lib/aws-s3"; import * as transfer from "aws-cdk-lib/aws-transfer"; import * as iam from "aws-cdk-lib/aws-iam"; import { readFileSync } from "fs"; import * as path from "path"; const SFTP_PUBLIC_KEY = readFileSync(path.join(__dirname, "../sftp_key.pub")); export class DemoStreamToS3AndBackStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const bucket = new s3.Bucket(this, "StreamToS3AndBackBucket", { bucketName: "stream-to-s3-and-back", versioned: true, encryption: s3.BucketEncryption.S3_MANAGED, removalPolicy: cdk.RemovalPolicy.DESTROY, }); // Create an IAM role for SFTP users const sftpUserRole = new iam.Role(this, "SftpUserRole", { assumedBy: new iam.ServicePrincipal("transfer.amazonaws.com"), }); // Grant read/write permissions to the S3 bucket bucket.grantReadWrite(sftpUserRole); // Create the SFTP server const server = new transfer.CfnServer(this, "SftpServer", { protocols: ["SFTP"], identityProviderType: "SERVICE_MANAGED", loggingRole: sftpUserRole.roleArn, }); // Create an SFTP user const user = new transfer.CfnUser(this, "SftpUser", { userName: "testuser", serverId: server.attrServerId, role: sftpUserRole.roleArn, homeDirectory: `/${bucket.bucketName}`, sshPublicKeys: [SFTP_PUBLIC_KEY.toString()], }); // Output the SFTP server endpoint new cdk.CfnOutput(this, "SftpEndpoint", { value: `${server.attrServerId}.server.transfer.${this.region}.amazonaws.com`, description: "SFTP Server Endpoint", }); } }

The above code does the following:

  • Import Necessary Modules: Import AWS CDK libraries, Node.js modules for file handling, and necessary AWS services.
  • Read SFTP Public Key: Read the SFTP public key from a file.
  • Define the Stack Class: Define a class that extends the CDK Stack to encapsulate all resources.
  • Create an S3 Bucket: Create an S3 bucket with versioning, encryption, and a removal policy.
  • Create an IAM Role for SFTP Users: Create an IAM role to be assumed by the SFTP service.
  • Grant Permissions to the S3 Bucket: Grant read and write permissions to the IAM role for the S3 bucket.
  • Create the SFTP Server: Create an SFTP server using AWS Transfer Family with the IAM role for logging.
  • Create an SFTP User: Create an SFTP user associated with the server and provide the SSH public key.
  • Output the SFTP Server Endpoint: Output the SFTP server endpoint URL for connecting to the server.

We are referencing the same S3 bucket we will use in uploads for our SFTP server to simplify things.

Once this is done, we can deploy our stack with the following command:

$ npx cdk deploy

The above step expects that your AWS credentials are set up correctly. If you have not done so, please follow a guide like this one for the AWS CLI.

Make sure to take note of the output for the SFTP server endpoint.

Once that is done, let's move onto our code to interface with our infrastructure.

Writing our demultiplexer

We covered a version of multiplexing in our Duplex streams post. In this case, we will be writing a demultiplexer.

Our demultiplexer will take in PassThrough stream types in the constructor and ensure that they are written to when we are processing data.

Let's see how that looks. Add the following to src/demultiplexer.ts:

import { Duplex, PassThrough } from "node:stream"; /** * We create a demultiplexer to write out to two different * streams. * * 1. To handle writing the CSV file. * 2. To handle writing the JSON file. * * Reading is handled by the pipeline implementation. */ export class Demultiplexer extends Duplex { private outputStreams: PassThrough[]; constructor(outputStreams: PassThrough[]) { super({ objectMode: true, }); this.outputStreams = outputStreams; } _write( chunk: any, _encoding: BufferEncoding, callback: (error?: Error | null) => void ): void { for (const outputStream of this.outputStreams) { outputStream.write(chunk); } callback(); } _final(callback: (error?: Error | null) => void): void { for (const outputStream of this.outputStreams) { outputStream.end(); } callback(); } }

Our demultiplexer has implementations for both _write and _final. The _write method writes the chunk to all the output streams, while the _final method ends all the output streams.

Although this is not use case, a simple example of this in use:

import { PassThrough } from "node:stream"; const outputStream1 = new PassThrough(); const outputStream2 = new PassThrough(); const demux = new Demultiplexer([outputStream1, outputStream2]);

This would configure our multiplex to write messages to both outputStream1 and outputStream2.

The implementation itself isn't super flexible. In hindsight, you would ideally allow anything that is a Writable stream to be passed in. But it will suffice for our example (and refactoring this could be some nice homework).

Writing our upload script

Now that we have our infrastructure set up and our demultiplexer ready, let's write our upload script.

Add the following to src/upload.ts:

import { readFileSync } from "fs"; import { pipeline } from "node:stream/promises"; import { format } from "fast-csv"; import { createGzip } from "node:zlib"; import { S3Client } from "@aws-sdk/client-s3"; import { PassThrough, Readable } from "node:stream"; import { Upload } from "@aws-sdk/lib-storage"; import { Demultiplexer } from "./demultiplexer"; import * as path from "node:path"; import Client = require("ssh2-sftp-client"); const BUCKET_NAME = "stream-to-s3-and-back"; const sftp = new Client(); const s3Client = new S3Client(); const config = { host: "cdk-output.server.transfer.ap-southeast-2.amazonaws.com", port: 22, username: "testuser", privateKey: readFileSync(path.join(__dirname, "../sftp_key")), }; async function uploadToS3() { try { const data = [ { flavor: "vanilla", topping: "sprinkles", }, { flavor: "chocolate", topping: "fudge", }, { flavor: "strawberry", topping: "whipped cream", }, ]; await sftp.connect(config); console.log("SFTP connected successfully"); // We use a PassThrough to support uploading // via the pipeline API. const uploadToS3PassThrough = new PassThrough(); const upload = new Upload({ client: s3Client, params: { Bucket: BUCKET_NAME, Key: "output.csv.gz", Body: uploadToS3PassThrough, }, }); // PassThroughs for Demux const s3UploadPassThrough = new PassThrough({ objectMode: true, }); const sftpUploadPassThrough = new PassThrough({ objectMode: true, }); const demux = new Demultiplexer([ s3UploadPassThrough, sftpUploadPassThrough, ]); await Promise.all([ pipeline( Readable.from(data), format({ quoteColumns: true, headers: ["flavor", "topping"] }), createGzip(), demux ), pipeline(s3UploadPassThrough, uploadToS3PassThrough), pipeline( sftpUploadPassThrough, sftp.createWriteStream(`/${BUCKET_NAME}/sftp-upload.csv.gz`) ), ]); await sftp.end(); await upload.done(); console.log("File processed and uploaded successfully"); } catch (error) { console.error("Error:", error); } } async function main() { await uploadToS3(); } main();

Be sure to replace `cdk-output.server.transfer.ap-southeast-2.amazonaws.com` with the endpoint you received from the CDK output.

This script is more complex than the others we've written so far. Although it is a little contrived. Here's a breakdown of what it does:

  • Import Necessary Modules: Import various Node.js and AWS SDK modules for file handling, streaming, CSV formatting, compression, and S3 operations.
  • Define Constants: Define the S3 bucket name, SFTP client instance, and S3 client instance.
  • Read SFTP Private Key: Read the SFTP private key from a file.
  • Set SFTP Configuration: Set up the SFTP connection configuration.
  • Define uploadToS3 Function: Define an asynchronous function to handle the data processing and uploading.
  • Prepare Data: Create an array of data to be uploaded.
  • Connect to SFTP: Connect to the SFTP server using the configuration.
  • Create PassThrough Streams: Create PassThrough streams to facilitate the pipeline process.
  • Create S3 Upload Object: Create an upload object for S3 using the AWS SDK.
  • Create Demultiplexer: Create a Demultiplexer instance to split the data stream.
  • Setup Pipelines: Set up parallel pipelines for processing data, compressing it, and sending it to both S3 and SFTP.
  • End SFTP Connection: End the SFTP connection after the upload is complete.
  • Log Success or Error: Log the success message or handle errors if they occur.
  • Define main Function: Define a main function to call uploadToS3.
  • Execute main Function: Execute the main function to start the process.

You can visualize the process a bit better with the following flow chart:

Upload script flow chart

Upload script flow chart

The significant part is our pipelines:

await Promise.all([ pipeline( Readable.from(data), format({ quoteColumns: true, headers: ["flavor", "topping"] }), createGzip(), demux ), pipeline(s3UploadPassThrough, uploadToS3PassThrough), pipeline( sftpUploadPassThrough, sftp.createWriteStream(`/${BUCKET_NAME}/sftp-upload.csv.gz`) ), ]);

We are using the pipeline function to chain together our streams.

The first pipeline generates the CSV data, formats it, compresses it, and then demultiplexes it. The second pipeline sends the data to S3. The third pipeline sends the data to the SFTP server.

Our second and third streams are receivers for our demultiplexer. The PassThrough is used as both the destination of our compression pipeline, and is also the source for our second and third pipelines that are doing two very different things.

It is also important for us to await the end of the remote streams in our case. That is where the following code comes into play:

await sftp.end(); await upload.done();

Without it, our script would end prematurely before the data is fully uploaded.

Next, we can run our script with the following command:

$ npx tsx src/upload.ts SFTP connected successfully File processed and uploaded successfully

The above assumes that you've configured your AWS credentials correctly and already deployed your infrastructure.

At this point, if you jump onto your AWS S3 bucket, you should see a file called output.csv.gz that contains the compressed CSV file as well as a file called sftp-upload.csv.gz that contains the same file that we uploaded through SFTP.

Writing our download script

At this point, we have successfully uploaded a file to S3 and SFTP. Now, let's write a script to download the file from S3 and SFTP, decompress it, and write it to disk.

Add the following to src/download.ts:

import { createWriteStream, readFileSync } from "node:fs"; import { parse } from "fast-csv"; import { createGunzip } from "node:zlib"; import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3"; import { PassThrough, Readable, Transform } from "node:stream"; import { pipeline } from "node:stream/promises"; import { stringify } from "yaml"; import { Demultiplexer } from "./demultiplexer"; import * as path from "node:path"; import Client = require("ssh2-sftp-client"); const BUCKET_NAME = "stream-to-s3-and-back"; const sftp = new Client(); const s3Client = new S3Client(); const config = { host: "cdk-output.server.transfer.ap-southeast-2.amazonaws.com", port: 22, username: "testuser", privateKey: readFileSync(path.join(__dirname, "../sftp_key")), }; const yamlTransformStream = new Transform({ objectMode: true, transform(chunk, _encoding, callback) { this.push(stringify([chunk]) + "\n"); callback(); }, }); async function downloadFromS3() { try { await sftp.connect(config); console.log("SFTP connected successfully"); const { Body: s3UploadBody } = await s3Client.send( new GetObjectCommand({ Bucket: BUCKET_NAME, Key: "output.csv.gz", }) ); if (s3UploadBody instanceof Readable === false) { throw new Error("S3 object body is not a readable stream"); } const csvStreamPassThrough = new PassThrough({ objectMode: true, }); const csvOutput = createWriteStream("downloaded_output.csv"); const yamlStreamPassThrough = new PassThrough({ objectMode: true, }); const yamlOutput = createWriteStream("downloaded_output.yaml"); const demux = new Demultiplexer([ csvStreamPassThrough, yamlStreamPassThrough, ]); // Create a write stream for the SFTP CSV output const sftpCsvOutput = createWriteStream("downloaded_sftp_output.csv"); const sftpReadableStream = sftp.createReadStream("output.csv.gz"); // Buffer everything into the demux stream await Promise.all([ // S3 download streams pipeline(s3UploadBody, createGunzip(), demux), pipeline(csvStreamPassThrough, csvOutput), pipeline( yamlStreamPassThrough, parse({ headers: true, objectMode: true }), yamlTransformStream, yamlOutput ), // SFTP download stream pipeline(sftpReadableStream, createGunzip(), sftpCsvOutput), ]); await sftp.end(); console.log("File downloaded, decompressed, and processed successfully"); } catch (error) { console.error("Error:", error); } } async function main() { await downloadFromS3(); } main();

Again, you will need to update your SFTP endpoint in the script.

Let's breakdown what we are doing here:

  • Import Necessary Modules: Import Node.js and AWS SDK modules for file handling, streaming, CSV parsing, decompression, YAML stringifying, and S3 operations.
  • Define Constants: Define the S3 bucket name, SFTP client instance, and S3 client instance.
  • Read SFTP Private Key: Read the SFTP private key from a file.
  • Set SFTP Configuration: Set up the SFTP connection configuration.
  • Define YAML Transform Stream: Create a transform stream to convert CSV data to YAML format.
  • Define downloadFromS3 Function: Define an asynchronous function to handle data downloading, decompressing, and processing.
  • Connect to SFTP: Connect to the SFTP server using the configuration.
  • Download S3 Object: Use AWS SDK to download the compressed CSV file from S3.
  • Check S3 Body Type: Ensure the downloaded S3 object body is a readable stream.
  • Create PassThrough Streams: Create PassThrough streams to facilitate the pipeline process.
  • Create Write Streams: Create write streams for local CSV and YAML files, and for SFTP CSV output.
  • Create Demultiplexer: Create a Demultiplexer instance to split the data stream into CSV and YAML streams.
  • Setup Pipelines: Set up parallel pipelines for decompressing S3 data, splitting streams, and writing to local files.
  • Decompress SFTP Data: Set up a pipeline to download, decompress, and write SFTP CSV data to a local file.
  • End SFTP Connection: End the SFTP connection after the download and processing are complete.
  • Log Success or Error: Log the success message or handle errors if they occur.
  • Define main Function: Define a main function to call downloadFromS3.
  • Execute main Function: Execute the main function to start the process.

Another flow chart to help visualize how the code is executing:

Download script flow chart

Download script flow chart

Again, it's the pipelines that play the most integral role:

// Buffer everything into the demux stream await Promise.all([ // S3 download streams pipeline(s3UploadBody, createGunzip(), demux), pipeline(csvStreamPassThrough, csvOutput), pipeline( yamlStreamPassThrough, parse({ headers: true, objectMode: true }), yamlTransformStream, yamlOutput ), // SFTP download stream pipeline(sftpReadableStream, createGunzip(), sftpCsvOutput), ]);

Our first three pipelines work with the multiplexer to download the content from S3, then split that content into streams that write a CSV file and a YAML file respectively.

The fourth pipeline downloads the compressed CSV file from SFTP, decompresses it, and writes it to disk.

We can run this script with the following:

$ npx tsx src/download.ts SFTP connected successfully File downloaded, decompressed, and processed successfully

We will end up with three files written to disk:

  1. downloaded_output.csv: The CSV file downloaded from S3.
  2. downloaded_output.yaml: The YAML file generated from the CSV file.
  3. downloaded_sftp_output.csv: The CSV file downloaded from the SFTP server.

The content of (1) and (3) should be identical:

"flavor","topping" "vanilla","sprinkles" "chocolate","fudge" "strawberry","whipped cream"

While the content of (2) should be:

- flavor: vanilla topping: sprinkles - flavor: chocolate topping: fudge - flavor: strawberry topping: whipped cream

Please be aware that awaiting the end of the remote streams is important again. We run into the same problem of prematurely ending the stream without it.

Otherwise, we have successfully uploaded a file to S3 and SFTP, downloaded it, decompressed it, and written it to disk!

Teardown

Once you are done with the project, you can tear down the infrastructure with the following command:

$ npx cdk destroy

Please be aware that all the content from the bucket needs to be deleted beforehand. If you get an error teardown, head to the AWS console and manually delete the stack and bucket... and definitely please do. Transfer Family is sadly a bit $$$.

Conclusion

Congratulations on making it through the Node.js Streams blog series! By now, you should have a solid understanding of how to work with streams in Node.js, including the different types of streams and how they can be utilized.

In this post, we introduced the PassThrough stream and demonstrated its use in a practical example involving AWS CDK, AWS S3, and AWS Transfer Family. We explored how to create a pipeline for uploading and downloading files using Node.js streams, showcasing the versatility and power of streams in handling complex data flows.

Our key takeaways and lessons today:

  • PassThrough Streams: These streams allow data to pass through without modification, making them useful for observing data flow, delaying piping, or providing placeholders for future transformations.
  • AWS Integration: Leveraging AWS CDK, S3, and Transfer Family, we created a robust data pipeline for handling file uploads and downloads.
  • Demultiplexer: Implementing a custom demultiplexer stream allowed us to split data streams into multiple outputs efficiently.
  • Practical Application: By applying these concepts, we demonstrated how to handle real-world scenarios involving data processing and transfer in a scalable and efficient manner.

This journey through Node.js streams, from understanding the basics to implementing advanced pipelines, equips you with the knowledge to tackle various data processing tasks. Streams provide a powerful abstraction for handling data flows, and with the skills you've gained, you can now confidently apply them to your projects.

Resources and further reading

Disclaimer: This blog post used AI to generate the images used for the analogy.

Photo credit: robanderson72

Personal image

Dennis O'Keeffe

Byron Bay, Australia

Dennis O'Keeffe

2020-present Dennis O'Keeffe.

All Rights Reserved.