Node streams have a lot of issues. Awkward to control backpressure, no error propagation, overcomplicated implementation impenetrable to any view source attempts, etc...
To solve this, here is an implementation of pull-streams in modern JS, using promises, async iterators, and for..await
loops.
Features:
- Built in backpressure.
- Build in error propagation.
- Non blocking by default.
- Super simple to use, implement, and reason about.
- Seamless fit into modern async flow control (async/await & promises).
Source (readable) streams are functions that create an async iterator.
Example:
// Transforms an iterable object (such as array) into an async generator.
async function * values(iterable) {
for (const item of iterable) {
yield item;
}
}
Transform streams are functions that accept an async iterator, and return another async iterator.
Example transform stream creators:
// Applies `fn()` to all values before passing them through.
function map(fn) {
return async function * (asyncIterator) {
for await (const value of asyncIterator) {
yield fn(value);
}
};
}
// Lets through only values that pass the `test()`.
function filter(test) {
return async function * (asyncIterator) {
for await (const value of asyncIterator) {
if (test(value)) {
yield value;
}
}
};
}
// Delays each read by `ms`.
function delay(ms) {
return async function * (asyncIterator) {
for await (const value of asyncIterator) {
await new Promise(resolve => setTimeout(resolve, ms));
yield value;
}
};
}
// Throws an error when value passing through matches `x`.
function errorOn(x) {
return async function * (asyncIterator) {
for await (const value of asyncIterator) {
if (value === x) {
throw new Error(`Value can't be "${x}"`);
}
yield value;
}
};
}
Sinks are functions that accept an async iterator, and return a promise.
Example sinks:
// Logs all values pulled from passed stream.
async function log(asyncIterator) {
for await (const value of asyncIterator) {
console.log(value);
}
}
// Reduces all values with reducer().
function reduce(reducer, acc) {
return async function (asyncIterator) {
for await (const value of asyncIterator) {
acc = reducer(acc, value);
}
return acc;
}
}
With this little pull helper to help us compose streams left to right:
// Help us compose streams left to right.
function pull(...args) {
let result = args.reverse().shift();
for (let stream of args) {
result = result(stream);
}
return result;
}
Read numbers 0 to 4 and sum them up:
const arr = [0, 1, 2, 3, 4];
const numbers = values(arr); // source stream of numbers from arr
const sum = reduce((x, y) => x + y, 0); // sink to sum up all values
// raw
sum(numbers).then(x => console.log(x));
// with pull helper
pull(numbers, sum).then(x => console.log(x));
// in an async function
(async () => {
console.log(await pull(numbers, sum));
})();
// All versions above log:
// > 10
Read numbers 0 to 4, delay each read by 300ms, filter only even numbers, and square the rest:
const arr = [0, 1, 2, 3, 4];
const numbers = values(arr); // source stream of numbers from arr
const wait300 = delay(300); // transform stream to add 300 ms delay to each read
const even = filter(x => x % 2 === 0); // transform stream to filter only even numbers
const square = map(x => x * x); // transform stream to square each number
// raw
log(square(even(wait300(numbers)))).then(() => console.log('done'));
// with pull helper
pull(numbers, wait300, even, square, log).then(() => console.log('done'));
// in an async function
(async () => {
await pull(numbers, wait300, even, square, log);
console.log('done');
})();
// All versions above log:
// > 0
// > 4
// > 16
// > done
Read numbers 0 to 4, but throw an error on 2:
const arr = [0, 1, 2, 3, 4];
pull(values(arr), errorOn(2), log).then(null, err => console.log(`Error: ${err.message}`));
// in an async function
(async () => {
try {
await pull(values(arr), errorOn(3), log);
} catch(err) {
console.error(`Error: ${err.message}`);
}
})();
// All versions above log:
// > 0
// > 1
// > Error: Value can't be "2"
async/await
is going to land in node (behind a flag) October 2016, and without a flag possibly mid December (already in Chrome Canary).
Async generators is currently stage 3 (candidate), which means the spec is complete, and browser implementations are going to be landing any moment.
Regardless, you can use all of it already with babel transpilation. Just try and play with any of the code above in babel repl.
And not a single callback was passed that day.