What are Node.js streams?
Node.js streams offer a powerful abstraction for managing data flow in your applications. They excel at processing large datasets, such as reading or writing from files and network requests, without compromising performance. This approach differs from loading the entire dataset into memory at once. Streams process data in chunks, significantly reducing memory usage. All streams in Node.js inherit from theEventEmitter class, allowing them to emit events at various stages of data processing. These streams can be readable, writable, or both, providing flexibility for different data-handling scenarios.
Event-driven architecture
Node.js thrives on an event-driven architecture, making it ideal for real-time I/O. This means consuming input as soon as it’s available and sending output as soon as the application generates it. Streams seamlessly integrate with this approach, enabling continuous data processing. They achieve this by emitting events at key stages. These events include signals for received data (data event) and the stream’s completion (end event). Developers can listen to these events and execute custom logic accordingly. This event-driven nature makes streams highly efficient for the processing of data from external sources.
Why use streams?
Memory Efficiency
Streams process data incrementally, consuming and processing data in chunks rather than loading the entire dataset into memory. This is a major advantage when dealing with large datasets, as it significantly reduces memory usage and prevents memory-related performance issues.
Improved Response Time
Streams allow for immediate data processing. When a chunk of data arrives, it can be processed without waiting for the entire payload or dataset to be received. This reduces latency and improves your application’s overall responsiveness.
Scalability for Real-Time Processing
By handling data in chunks, Node.js streams can efficiently handle large amounts of data with limited resources. This scalability makes streams ideal for applications that process high volumes of data in real time.
If your application already has all the data readily available in memory, using streams might add unnecessary overhead, complexity, and slow down your application.
Stream history
This section is a reference of the history of streams in Node.js. Unless you’re working with a codebase written for a Node.js version prior to 0.11.5 (2013), you will rarely encounter older versions of the streams API, but the terms might still be in use.Streams 0
Streams 0
The first version of streams was released at the same time as Node.js. Although there wasn’t a Stream class yet, different modules used the concept and implemented the
read/write functions. The util.pump() function was available to control the flow of data between streams.Streams 1 (Classic)
Streams 1 (Classic)
With the release of Node v0.4.0 in 2011, the Stream class was introduced, as well as the
pipe() method.Streams 2
Streams 2
In 2012, with the release of Node v0.10.0, Streams 2 were unveiled. This update brought new stream subclasses, including Readable, Writable, Duplex, and Transform. Additionally, the
readable event was added. To maintain backwards compatibility, streams could be switched to the old mode by adding a data event listener or calling pause() or resume() methods.Streams 3 (current)
Streams 3 (current)
In 2013, Streams 3 were released with Node v0.11.5, to address the problem of a stream having both a
data and readable event handlers. This removed the need to choose between ‘current’ and ‘old’ modes. Streams 3 is the current version of streams in Node.js.Stream types
Readable
Readable is the class that we use to sequentially read a source of data. Typical examples of Readable streams in Node.js API are fs.ReadStream when reading files, http.IncomingMessage when reading HTTP requests, and process.stdin when reading from the standard input.
Key methods and events
A readable stream operates with several core methods and events that allow fine control over data handling:on('data'): This event is triggered whenever data is available from the stream. It is very fast, as the stream pushes data as quickly as it can handle, making it suitable for high-throughput scenarios.on('end'): Emitted when there is no more data to read from the stream. It signifies the completion of data delivery. This event is only fired when all the data from the stream has been consumed.on('readable'): This event is triggered when there is data available to read from the stream or when the end of the stream has been reached. It allows for more controlled data reading when needed.on('close'): This event is emitted when the stream and its underlying resources have been closed and indicates that no more events will be emitted.on('error'): This event can be emitted at any point, signaling that there was an error processing. A handler for this event can be used to avoid uncaught exceptions.
Basic readable stream
Here’s an example of a simple readable stream implementation that generates data dynamically:MyStream class extends Readable and overrides the _read() method to push a string ”:-)”. After pushing the string five times, it signals the end of the stream by pushing null. The on('data') event handler logs each chunk to the console as it is received.
Advanced control with the readable event
For even finer control over data flow, the readable event can be used. This event is more complex but provides better performance for certain applications by allowing explicit control over when data is read from the stream:null, indicating that the buffer is temporarily empty or the stream has ended. Setting highWaterMark to 1 keeps the buffer size small, triggering the readable event more frequently and allowing more granular control over the data flow.
With the previous code, you’ll get an output like:
on('readable') event, it makes a first call to read() because that is what might trigger the emission of a readable event. After the emission of said event, we call read on the first iteration of the while loop. That’s why we get the first two smileys in one row. After that, we keep calling read until null is pushed. Each call to read programs the emission of a new readable event, but as we are in “flow” mode (i.e., using the readable event), the emission is scheduled for the nextTick.
You can try to run the code with
NODE_DEBUG=stream to see that emitReadable is triggered after each push.push into a setImmediate or process.nextTick like this:
Writable
Writable streams are useful for creating files, uploading data, or any task that involves sequentially outputting data. While readable streams provide the source of data, writable streams in Node.js act as the destination for your data. Typical examples of writable streams in the Node.js API are fs.WriteStream, process.stdout, and process.stderr.
Key methods and events in writable streams
.write(): This method is used to write a chunk of data to the stream. It handles the data by buffering it up to a defined limit (highWaterMark), and returns a boolean indicating whether more data can be written immediately..end(): This method signals the end of the data writing process. It signals the stream to complete the write operation and potentially perform any necessary cleanup.
Creating a writable stream
Here’s an example of creating a writable stream that converts all incoming data to uppercase before writing it to the standard output:- ESM
- CJS
MyStream is a custom Writable stream with a buffer capacity (highWaterMark) of 10 bytes. It overrides the _write method to convert data to uppercase before writing it out.
The loop attempts to write hello ten times to the stream. If the buffer fills up (waitDrain becomes true), it waits for a drain event before continuing, ensuring we do not overwhelm the stream’s buffer.
The output will be:
Duplex
Duplex streams implement both the readable and writable interfaces. Duplex streams implement all the methods and events described in Readable and Writable Streams.
A good example of a duplex stream is the Socket class in the net module:
- ESM
- CJS
Hello from server! to any connecting client, and log any data received.
- ESM
- CJS
Hello from client message, and log any received data.
Transform
Transform streams are duplex streams, where the output is computed based on the input. As the name suggests, they are usually used between a readable and a writable stream to transform the data as it passes through.
Key methods and events in transform streams
Apart from all the methods and events in Duplex Streams, there is:_transform: This function is called internally to handle the flow of data between the readable and writable parts. This MUST NOT be called by application code.
Creating a transform stream
To create a new transform stream, we can pass anoptions object to the Transform constructor, including a transform function that handles how the output data is computed from the input data using the push method.
- ESM
- CJS
How to operate with streams
When working with streams, we usually want to read from a source and write to a destination, possibly needing some transformation of the data in between. The following sections will cover different ways to do so..pipe()
The .pipe() method concatenates one readable stream to a writable (or transform) stream. Although this seems like a simple way to achieve our goal, it delegates all error handling to the programmer, making it difficult to get it right.
The following example shows a pipe trying to output the current file in uppercase to the console.
- ESM
- CJS
upper will return an error in the callback, which will cause the stream to close. However, the other streams won’t be notified, resulting in memory leaks. The output will be:
pipeline()
To avoid the pitfalls and low-level complexity of the .pipe() method, in most cases, it is recommended to use the pipeline() method. This method is a safer and more robust way to pipe streams together, handling errors and cleanup automatically.
The following example demonstrates how using pipeline() prevents the pitfalls of the previous example:
- ESM
- CJS
pipeline() method also has an async version, which doesn’t accept a callback but instead returns a promise that is rejected if the pipeline fails.
Async iterators
Async iterators are recommended as the standard way of interfacing with the Streams API. Compared to all the stream primitives in both the Web and Node.js, async iterators are easier to understand and use, contributing to fewer bugs and more maintainable code. In Node.js, all readable streams are asynchronous iterables. This means you can use thefor await...of syntax to loop through the stream’s data as it becomes available, handling each piece of data with the efficiency and simplicity of asynchronous code.
Benefits of using async iterators with streams
Enhanced Readability
The code structure is cleaner and more readable, particularly when dealing with multiple asynchronous data sources.
Error Handling
Async iterators allow straightforward error handling using try/catch blocks, akin to regular asynchronous functions.
Flow Control
They inherently manage backpressure, as the consumer controls the flow by awaiting the next piece of data, allowing for more efficient memory usage and processing.
- ESM
- CJS
try...catch block to handle possible errors.
Object mode
By default, streams can work with strings,Buffer, TypedArray, or DataView. If an arbitrary value different from these (e.g., an object) is pushed into a stream, a TypeError will be thrown. However, it is possible to work with objects by setting the objectMode option to true. This allows the stream to work with any JavaScript value, except for null, which is used to signal the end of the stream.
- ESM
- CJS
When working in object mode, the
highWaterMark option refers to the number of objects, not bytes.Backpressure
When using streams, it is important to make sure the producer doesn’t overwhelm the consumer. For this, the backpressure mechanism is used in all streams in the Node.js API, and implementors are responsible for maintaining that behavior. In any scenario where the data buffer has exceeded thehighWaterMark or the write queue is currently busy, .write() will return false.
When a false value is returned, the backpressure system kicks in. It will pause the incoming Readable stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a 'drain' event will be emitted to resume the incoming data flow.
For a deeper understanding of backpressure, check the backpressure guide.
Streams vs Web streams
The stream concept is not exclusive to Node.js. In fact, Node.js has a different implementation of the stream concept called Web Streams, which implements the WHATWG Streams Standard. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible. Web Streams implement theReadableStream, WritableStream, and TransformStream classes, which are homologous to Node.js’s Readable, Writable, and Transform streams.
Interoperability of streams and Web streams
Node.js provides utility functions to convert to/from Web Streams and Node.js streams. These functions are implemented astoWeb and fromWeb methods in each stream class.
The following example in the Duplex class demonstrates how to work with both readable and writable streams converted to Web Streams:
- ESM
- CJS
- ESM
- CJS
Be aware that the fetch body is a
ReadableStream<Uint8Array>, and therefore a TextDecoderStream is needed to work with chunks as strings.