Skip to content

Instantly share code, notes, and snippets.

@richardscarrott
Last active February 16, 2021 13:21
Show Gist options
  • Save richardscarrott/29155e1d3026ecb20cc62e6c8cd0c246 to your computer and use it in GitHub Desktop.
Save richardscarrott/29155e1d3026ecb20cc62e6c8cd0c246 to your computer and use it in GitHub Desktop.
Highlandjs toPromise backpressure
const _ = require("highland");
const { Readable } = require("stream");
const wrapAsync = require("./wrap-async");
const sleep = (duration) =>
new Promise((resolve) => setTimeout(resolve, duration));
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000;
this._index = 1;
}
_read() {
console.log("Reading", this._index);
setTimeout(() => {
const i = this._index++;
if (i > this._max) this.push(null);
else {
const str = String(i);
const buf = Buffer.from(str, "ascii");
this.push(buf);
}
// }, 5000); // slow read
}, 10); // fast read
}
}
const main = async () => {
const result = await _(new Counter({ highWaterMark: 0 }))
.map(
wrapAsync(async (chunk) => {
console.log("Transforming", chunk.toString());
await sleep(5000); // slow transform
// await sleep(10); // fast transform
return chunk.toString();
})
)
.mergeWithLimit(3)
.map(
wrapAsync(async (chunk) => {
console.log("Writing", chunk);
await sleep(20000); // slow write
// await sleep(10); // fast write
return chunk;
})
)
.mergeWithLimit(3)
.reduce(undefined, (acc, chunk) => {
console.log("Discarding", chunk);
return acc;
})
.toPromise(Promise);
console.log("DONE: ", result);
};
main().catch((ex) => console.error(ex));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment