Every stream has two basic parts
- data-in - A chunk or EOF enters the stream
- data-out - A chunk or EOF leaves the stream
I'm combining both "data" and "end" events since "end" is just a special data packet signifying the end of the stream.
Flow control is dealing with situations when the reader and writer to a stream aren't moving at the same speed. If the reader is faster, it's not a problem, the queue will get drained and the reader will be forced to wait on more data to appear. But when the writer is faster, data will queue up inside the stream and use tons of ram. What is needed is some mechanism to tell the writer to slow down when the reader is overwhelmed and resume when it's feeling better.
For proper flow control, some more controls are needed
- pause-in - Inform the input we need a rest, data in queue will still emit
- resume-in - Inform the input to resume pumping data
- pause-out - We don't want the output to emit any more data, buffer till resumed.
- resume-out - Drain the buffer and emit events again
Sometimes we're consuming a stream and after parsing data we realize that not all the data in the chunk was for us. The last part was for another piece of code that's expecting to have it come out of the stream.
A concrete example of this is http websockets. Suppose the stream chunk contains the http headers and some websocket data. When we read this chunk and send it to the http parser, it will notify us that not all the data was used. We then need to stop listening and hand-off the stream to another part of the code that does websocket protocol, but we need some way to return the extra data back to the stream
- data-return - We grabbed too much data and want to return some for other code to consume.
- data-peek-and-flush - Another technique is to peek and then manually flush the parts we consumed
How does error handling and propagation fit into streams? Should they be related or go through some other communication channel? There is a difference between a stream crashing (internet disconnect, fs error, power outage...) and a finite stream reaching it's end.
There are many APIs for dealing with streams. For readable streams, I've seen two main camps.
Pull style APIs are where the consumer repeatedly calls some read function when it wants data. The stream can infer flow-control speed of the consumer based on how fast it calls read. If the consumer stops calling read, maybe we should pause the input.
function read() {
stream.read(function (chunk) {
// Do Something
read();
});
}
read()
Push style APIs are often easier to use, but make flow-control more explicit. Since the stream emits data as it gets it, the consumer has no choice but to handle it when it happens. When it's not ready to read more data, it needs to explicitly pause the stream while it drains.
stream.on("data", function (chunk) {
// Do something
});
stream.pause();
stream.resume();
My goal is to design an API that fits my needs and is simple to use. Ideally the interface is so simple that a helper library isn't needed, but I think I'll end up needing one in the end to get all the useful bits like high-water and low-water marks.
All streams are both readable and writable by someone, but to the end-user consuming the stream, they usually only have one end. In the case of something like a duplex tcp stream, this is actually two streams and the user has the readable end of one and the writable end of another. This means that a stream can export it's readable or writable interface in some concise manner so that they can be packaged into a single public object.
This stream is for reading a file from the filesystem as a stream. The public half is the readable end. The internal implementation has access to the writable half and writes to it. This way the person implementing the fs stream only has to interface with the writable end instead of implementing the readable end. A helper library is very useful here. The filesystem is pull-based because of the APIs we have available.
local function newFileReadStream(fd, onDone)
local stream = newStream()
local offset = 0
local chunkSize = 40960
local function read(err)
if (err) error(err)
uv.read(fd, offset, chunkSize, function (err, chunk)
if err then error(err) end
-- chunk will be nil when we've reached the end of the file
if chunk then
offset = offset + #chunk
else
onDone()
end
-- The stream will call read immedietly if it wants more data
-- It will call it later if it wants us to slow down
stream.write(chunk)(read)
end)
end
read()
-- Export just the readable half
return {
read = stream.read
unshift = stream.unshift
}
end
local function newFileWriteStream(fd, onDone)
local stream = newStream()
local offset = 0
local chunkSize = 40960
local function write(err, chunk)
if err error(err)
if not chunk then
return onDone()
else
uv.write(fd, offset, chunk, function (err)
if err then error(err) end
offset = offset + #chunk
stream.read()(write)
end)
end
stream.read()(write)
-- Export just the writable half
return {
write = stream.write
}
end
This is a stream for reading from and writing to a tcp socket. The socket is push-style when reading.
-- Handle is a uv_tcp_t instance from uv, it can be either client or server,
-- the API is the same
local function newHandleStream(handle)
-- Connect data coming from the socket to emit on the stream
local receiveStream = newStream()
local function write(handle, chunk)
-- If write doesn't callback sync, then we need to pause and resume the socket
local async
receiveStream.write(chunk)(function (err)
if err then error(err) end
if async == nil then async = false end
if async then
handle:readStart()
end)
if async == nil then
async = true
handle:readStop()
end
end
handle.ondata = write
handle.onend = write
-- Connect data being written to the stream and write it to the handle
local sendStream = newStream()
local function read(err)
if err then error(err) end
local async
sendStream.read()(function (err, chunk)
if err then error(err) end
if chunk then
handle:write(chunk, read)
else
handle:shutdown(read)
end
end)
end
read()
-- Return the halfs of the streams we're not using
return {
read = receiveStream.read,
unshift = receiveStream.unshift,
write = sendStream.write
}
end
...