A quick overview of the node.js streams interface with basic examples.
This is based on @brycebaril's presentation, Node.js Streams2 Demystified
Streams are a first-class construct in Node.js for handling data.
Think of them as as lazy evaluation applied to data.
There are essentially three major concepts:
- source - where the data comes from
- pipeline - where you filter or transform your data as it passes through
- sink - where your data ultimately goes
Benefits in using streams:
- lazily produce or consume data in buffered chunks
- evented and non-blocking
- low memory footprint
- automatically handle back-pressure
- buffers allow you to work around the v8 heap memory limit
- most core node.js content sources/sinks are streams already!
Five classes of streams:
Readable
- sourcesWritable
- sinksDuplex
- both source and sinkTransform
- in-flight stream operationsPassthrough
- stream spy
Below is a quick overview of Readable, Writable, and Transform streams.
See also:
Use a Readable stream when supplying data as a stream.
Think: spigot/faucet.
-
Subclass stream.Readable.
-
Implement a
_read(size)
method.
size
is in bytes, but can be ignored (especially for objectMode streams)_read(size)
must call this.push(chunk) to send a chunk to the consumer
-
highWaterMark
number: maximum number of bytes to store in the internal buffer before ceasing to read (default: 16kb) -
encoding
string: if set, buffers will be decoded to strings instead of passing buffers (default:null
) -
objectmode
boolean: instead of using buffers/strings, use javascript objects (default:false
)
readable.pipe(target)
readable.read(size)
readable.on("data", ... )
- stream-spigot - creates readable streams from Arrays or simple functions
Use a Writable stream when collecting data from a stream.
Think: drain/collect.
-
Subclass stream.Writable.
-
Implement a
_write(chunk, encoding, callback)
method.
chunk
is the content to write- Call
callback()
when you're done with this chunk
-
highWaterMark
number: maximum number of bytes to store in the internal buffer before ceasing to read (default: 16kb) -
decodeStrings
boolean: whether to decode strings to Buffers before passing them to_write()
(default: true)
source.pipe(sink)
writable.write(chunk [,encoding] [,callback])
- concat-stream - writable stream that concatenates strings or binary data and calls a callback with the result
Use a Transform stream when you want to operate on a stream in transit. This is a special kind of Duplex stream where the input and output stream are the same stream.
Think: filter/map.
- Subclass stream.Transform.
- Implement a
_transform(chunk, encoding, callback)
method. - Optionally implement a
_flush(callback)
method.
Call this.push(something)
to forward it to the next consumer.
You don't have to push anything, this will skip a chunk.
You must call callback
one time per _transform
call.
When the stream ends, this is your chance to do any cleanup or last-minute this.push()
calls to clear any buffers or work. Call callback()
when done.
Superset of Readable and Writable options.
source.pipe(transform).pipe(drain)
transform.on("data", ... )
- through2 - makes it easy to generate Transforms without all the subclassing boilerplate
- through2-map - Array.prototype.map analog for streams
- through2-filter - Array.prototype.filter analog for streams
- through2-reduce - Array.prototype.reduce analog for streams
- stream reducer demo - showing how to extend a Transform stream to create reducers/accumulators for streamed objects
- sculpt - a collection of transform stream
utilities (all operating in
objectMode
) - pipe-iterators - another collection of functions for iterating over object mode streams