This document is now maintained at https://github.com/awwright/node-simplex-pair
-
-
Save awwright/ea75c4295d063636e964cac556000c13 to your computer and use it in GitHub Desktop.
This is a proposal for improving the modularity and simplicity of streams. It intends to be not-incompatible with previous stream interfaces.
There are multiple goals, many of which may be implemented independent of the others:
- Publish
SimplexPair
andDuplexPair
utilities, which allow streams to be created with a symmetric API - reduce the number of locations where data is buffered, for performance and to avoid "bufferbloat" situations
- create a symmetrical interface for producing or consuming streaming data (e.g. use
write
calls instead ofpush
) - standardizing the order of events, to make evaluation order more predictable
- to expose some events as promises, to provide better guarentees about runtime order
interface Source {
Event: "readable"
Event: "end"
Event: "error"
boolean readable;
function read([n]);
function close();
function pipe(dst);
Promise final;
static function [Symbol.hasInstance]();
function [Symbol.asyncIterator]();
}
This is the basic interface by which data on a stream is consumed.
The data that is read typically comes from one of two sources:
- The data is generated as it is read, as fast as the application can read it.
- The data is read from a buffer, which is filled from a separate Sink interface (see
SimplexPair
below).
Several different mechanisms may be used to work with the available data:
Source#on("readable")
andSource#read()
Source#pipe(dst)
for await (const segment of source){ ... }
via the async iterator,Source#on("data")
orSource#resume()
, if the Flowing API is implemented
The "readable" event signals that there is data on the stream, which may be read using Source#read
Any data that becomes available is immediately written to the Sink dst
.
True if Source#read
will return data if called. This is set to true
when "readable" is emitted, and false
if Source#read()
returns null
.
Returns up to n units of data from the stream. If there is no additional data to be read, this will return null
, and no more data will be available until the next "readable" event.
Indicates the application does not intend to consume any more data; this signals the Sink side to stop writing.
The "error" event signals an error
The "end" event signals an EOF event, that no more data will be available.
A Promise that resolves when the stream will have no more data available (an EOF), or rejects when no more data will be available due to an error event.
Reading this property will add an "error" listener, or otherwise turn off fatal errors when no listeners are attached to "error".
Returns true
for Source
or Duplex
, to implement new Duplex instanceof Source
.
Implements the async iterator pattern, that allows for the following:
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
Source#pause
, Source#resume
, Source#isPaused
, and Source#on("data")
may also be implemented for backwards compatibility with Streams 1. In this case, the stream starts "paused".
interface Sink {
Event: "drain"
Event: "close"
function write(segment)
function end(segment)
function cork()
function uncork()
static function [Symbol.hasInstance]();
}
This is the primary interface by which data is written to a stream. The data is typically stored in a buffer to be available for reading via a Source interface, or delivered to the operating system kernel.
This call writes data to the stream.
If this function returns false
, it indicates the stream has become congested and the application should stop writing to it. If the writes are processed from an upstream source, the application should stop reading from that source too. This is the primary mechanism used to control how fast data is sent through the stream.
This call writes the given segment to the stream, if provided; then closes the stream.
This event indicates that all the buffered data has been read, and the application should resume writing, if it has paused.
Indicates that the Source side has stopped reading the stream, and will not read any more data. Any further data written will create an error.
Data written to the Sink will be buffered, and not become readable until Sink#uncork()
is called.
This feature is typically used for optimizations, so that Sink#write may be called in succession with lots of small pieces of data, without being sent as separate packets over the network.
If fn
is provided, it will be called in-line, and the stream automatically uncork when the function returns, or if it returns a Promise, when the promise resolves.
Returns true
for Sink
or Duplex
, to implement new Duplex instanceof Sink
.
A Duplex is a stream that is both readable and writable: You can write data to it, and/or read data from it.
interface Duplex implements Source, Sink {
Source readableSide;
Sink writableSide;
}
It has two properties readableSide
and writableSide
which exposes only that "half" of the Duplex stream.
Two properties readableSide
and writableSide
are exposed, which allows you to pass only a Source or Sink stream to another part of the application. This feature allows you to create encapsulated streams, so that a pointer to a readable stream does not grant write access; and vice-versa.
There are primarily two types of Duplex streams: a PassThrough/SimplexPair, and each side of a DuplexPair.
ECMAScript does not have a concept of multiple inheritance, so the prototypes are copied in, and instanceof
support is implemented via Sink[Symbol.hasInstance]
and Source[Symbol.hasInstance]
.
SimplexPair
is one of the two primary ways that an application can make data available for reading on a Source interface. It is a special type of Duplex that keeps a modest buffer, which is filled through the Sink interface, and is drained from the Source interface.
It is essentially the same as a Node.js PassThrough, but with the addition of writableSide
and readableSide
properties inherited from the Duplex interface.
interface SimplexPair : Duplex {
function _read(n);
}
By using the readableSide
and writableSide
properties, these two functions operate the same way:
function a(){
const { writableSide, readableSide } = new SimplexPair();
process.nextTick(() => writableSide.end("foo\r\n"));
return readableSide;
}
function b(){
const pair = new SimplexPair();
process.nextTick(() => pair.end("foo\r\n"));
return pair.readableSide;
}
SimplexPair
forms the basis of how virtually all Source
and Sink
streams are created in userland.
This is an optimization around creating a Source stream that does not depend on data becoming available, but instead can be generated as fast as the receiving end can read it.
Whenever the application calls SimplexPair#read
, and there is insufficient data in the buffer, this will trigger a call to the user-implemented function _read
; at this time the user may generate the requested number of bytes, and either return them, or make equivalent (but probably suboptimal) calls using write
, end
, and/or destroy
(either directly on the SimplexPair
instance, or on the writableSide
property).
By default, this function will not write or return anything; and so cause the read()
call to return null
.
The size
argument will always be provided; and will often be some large value like 0x10000. The function does not have to generate this much data, but should not generate more than that.
Example function that generates a sequence of bytes, increasing from 00 to FF then repeating:
const a = new SimplexPair();
var counter = 0;
a._read = function(size){
const buf = new Uint8Array(size);
for(var i=0; i<size; i++) buf[i] = (counter++)%0xFF;
return buf;
}
interface DuplexPair : Array {
0: Duplex;
1: Duplex;
}
A DuplexPair is created when an application needs to return a Duplex stream; in this case, it creates a related pair of Duplex streams. What is written to the local side will become readable to the remote side; and vice-versa.
Most Duplex streams will be one side of a DuplexPair. The streams are created identically, and one or the other may be used without any difference in behavior.
Creating a Transform type stream with DuplexPair is trivial:
function ROT13(){
const [ inside, outside ] = new DuplexPair;
outside.on('readable', function(){
for(var buf; buf = inside.read();){
inside.write(buf.toString().replace(/[a-zA-Z]/g, function(c){
const d = c.charCodeAt(0) + 13;
return String.fromCharCode( ((c<="Z")?90:122)>=d ? d : d-26 );
}));
}
});
return outside;
}
process.stdin.pipe(ROT13()).pipe(process.stdout);
This forms the basis for the Transform#initInnerSide
method below.
- Expose the existing
DuplexPair
class - Reimplement
Duplex
as aSource
andSink
- Reimplement
Readable
,Writable
, andTransform
as ancestors ofSource
andSink
(see below) - Implement
Source#final
property - Implement
Source#close
- Implement
SimplexPair
interface Readable : Source {
Writable initWritableSide();
function _read();
function _destroy();
}
Uses _read
, _destroy
user-provided methods for compatibility with Streams 2.
It provides an initWritableSide
call that allows the subclass constructor to acquire a Writable reference that fills the buffer, drained by the instance of this class.
interface Writable : Sink {
Readable initReadableSide();
function _write(chunk, encoding, callback);
function _writev(chunks, callback);
function _destroy(err, callback);
function _final(callback);
}
Uses _write
, _writev
, _destroy
, and _final
user-provided methods for compatibility with Streams 2.
It provides an initReadableSide
call that allows the subclass constructor to acquire a Readable reference that drains the buffer, filled by the instance of this class.
interface Transform : Duplex {
Readable initInputSide();
Writable initOutputSide();
Duplex initInnerSide();
function _transform(chunk, encoding, callback);
function _flush(callback);
}
Uses _transform
and _flush
user-provided methods for compatibility with Streams 2.
It provides an initInnerSide
call that allows the subclass constructor to acquire a private reference to the "inner" Duplex side that reads input and writes output. It may be set to a local variable, a private class property, or a public class property, as the needs of the application demand; once initialized and returned, the reference cannot be re-acquired. For example:
class ROT13 extends Transform {
#inside;
constructor() {
super();
const inside = this.#inside = this.initInnerSide();
inside.on('readable', function(){
for(var buf; buf = inside.read();){
inside.write(buf.toString().replace(/[a-zA-Z]/g, function(c){
const d = c.charCodeAt(0) + 13;
return String.fromCharCode( ((c<="Z")?90:122)>=d ? d : d-26 );
}));
}
});
}
}
process.stdin.pipe(new ROT13()).pipe(process.stdout);