Your friends from pull stream, but in terms of async iterators.
A "source" is something that can be consumed. It is an iterable object.
const ints = {
[Symbol.asyncIterator] () {
let i = 0
return {
async next () {
return { done: false, value: i++ }
}
}
}
}
// or, more succinctly using a generator and for/await:
const ints = (async function * () {
let i = 0
while (true) yield i++
})()
In Node.js stream terms this is a "readable".
A "sink" is something that consumes (or drains) a source. It is a function that takes a source and iterates over it. It optionally returns a value.
const logger = async source => {
const it = source[Symbol.asyncIterator]()
while (true) {
const { done, value } = await it.next()
if (done) break
console.log(value) // prints 0, 1, 2, 3...
}
}
// or, more succinctly using a generator and for/await:
const logger = async source => {
for await (const chunk of source) {
console.log(chunk) // prints 0, 1, 2, 3...
}
}
In Node.js stream terms this is a "writable".
A "transform" is both a sink and a source where the values it consumes and the values that can be consumed from it are connected in some way. It is a function that takes a source and returns a source.
const doubler = source => {
return {
[Symbol.asyncIterator] () {
const it = source[Symbol.asyncIterator]()
return {
async next () {
const { done, value } = await it.next()
if (done) return { done }
return { done, value: value * 2 }
}
return () {
return it.return && it.return()
}
}
}
}
}
// or, more succinctly using a generator and for/await:
const doubler = source => (async function * () {
for await (const chunk of source) {
yield chunk * 2
}
})()
A "duplex" is similar to a transform but the values it consumes are not necessarily connected to the values that can be consumed from it. It is an object with two properties, sink
and source
.
const duplex = {
sink: async source => {/* ... */},
source: { [Symbol.asyncIterator] () {/* ... */} }
}
To thread together multiple streaming iterables, you can just call them:
logger(doubler(ints))
This can look a bit back-to-front so you could use a convenience function to "pipe" them together and make the code more readable:
const pipe = (...fns) => {
let res
while (fns.length)
res = fns.shift()(res)
return res
}
pipe(() => ints, doubler, logger)
// Note, we can await on pipe for the return value of logger:
// const result = await pipe(...)
https://github.com/alanshaw/it-pipe is a utility module that does this.
@achingbrain that's a transform, it's a special case of a duplex where the two sides are connected. That technique works for outgoing streams but when you have an incoming stream you want to read from, then transform and then write to it doesn't work:
e.g.
If duplex was
source => newSource
(i.e. a transform) my mind breaks thinking about how you'd make an echo server with that or compose it to make a pipeline.There's also situations where you want to write without being blocked by a read which could happen using a transform.
Some other reasons too? I'm happy to be corrected here, this just seemed most natural. Both pull streams and Node.js streams have the concept of duplex so I feel like it's necessary!