Skip to content

Instantly share code, notes, and snippets.

@JoeSimmonds
Created April 6, 2018 13:43
Show Gist options
  • Select an option

  • Save JoeSimmonds/4465874f412a17bbf516c1fdf44d4df0 to your computer and use it in GitHub Desktop.

Select an option

Save JoeSimmonds/4465874f412a17bbf516c1fdf44d4df0 to your computer and use it in GitHub Desktop.
Streams and Promises
const {Writable, Readable, Transform} = require('stream');
class PromiseStream extends Readable {
constructor(options) {
super(Object.assign({objectMode:true, options}));
this.draining = false;
this.promiseCount = 0;
}
handleCompletion(x) {
this.promiseCount--;
console.debug(`Promise stream : promise completed, now waiting on ${this.promiseCount} promises.`);
this.push(x);
if (this.promiseCount <= 0 && this.draining) {
console.log("pushing null value to terminate the stream");
this.push(null);
}
};
shutdown() {
console.debug(`Promise stream shutdown, draining the pool of ${this.promiseCount} promises`);
this.draining = true;
};
addPromise(promise) {
this.promiseCount++;
console.debug(`Promise stream : promise added, now waiting on ${this.promiseCount} promises.`);
promise.then(x => this.handleCompletion.call(this, x));
};
_read() {
super.read();
}
}
class PromiseSink extends Writable {
constructor(options) {
super(Object.assign({objectMode:true, options}));
this.promiseStream = options.promiseStream;
this.name = options.name || "PromiseSink";
}
_write(obj, enc, cb) {
console.debug(this.name + " : promise sunk");
this.promiseStream.addPromise(obj);
cb();
}
_final(cb) {
console.debug(this.name + " : All promises have been sunk");
this.promiseStream.shutdown();
cb()
}
}
function pipeAwait(writableTarget) {
const promiseStream = new PromiseStream();
this.pipe(new PromiseSink({promiseStream}));
return promiseStream.pipe(writableTarget);
}
Readable.prototype.pipeAwait = Readable.prototype.pipeAwait || pipeAwait;
module.exports = {PromiseStream, PromiseSink};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment