Last active
September 18, 2017 19:59
-
-
Save IronSavior/0fdd16ee060ddbd04bc156a7eb0eec89 to your computer and use it in GitHub Desktop.
A through2-compatible Transform stream which processes chunks with limited (or unlimited) concurrency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const {Transform} = require('readable-stream'); | |
const PRIVATE = Symbol('ConcurrentTransform private state'); | |
const NOOP_TRANSFORM = (chunk, _, done) => done(null, chunk); | |
const NOOP_FLUSH = done => done(); | |
// Transform stream which processes chunks with limited (or unlimited) concurrency | |
class ConcurrentTransform extends Transform { | |
// @param opts {Object} Stream options. Set concurrency limit with "concurrent" (default concurrency: 1) | |
constructor( opts = {} ){ | |
opts = Object.assign({concurrent: 1}, opts); | |
const limit = opts.concurrent === Infinity ? Infinity : parseInt(opts.concurrent); | |
delete opts.concurrent; | |
if( !limit || isNaN(limit) || limit < 1 ) throw new Error(`Concurrent option must be integer > 0`); | |
const user_transform = opts.transform || NOOP_TRANSFORM; | |
delete opts.transform; | |
const user_flush = opts.flush || NOOP_FLUSH; | |
delete opts.flush; | |
super(opts); | |
this[PRIVATE] = {}; | |
this[PRIVATE].running = 0; | |
this[PRIVATE].limit = limit; | |
this[PRIVATE].user_transform = user_transform.bind(this); | |
this[PRIVATE].start_transform = start_transform.bind(this); | |
this[PRIVATE].user_flush = user_flush.bind(this); | |
} | |
_transform( chunk, encoding, transform_started ){ | |
if( this[PRIVATE].running >= this[PRIVATE].limit ) return this[PRIVATE].queued_transform = arguments; | |
this[PRIVATE].start_transform(chunk, encoding, transform_started); | |
} | |
_flush( flush_cb ){ | |
this[PRIVATE].flush_cb = flush_cb; | |
if( !this.running ) final_flush.call(this); | |
} | |
// @returns {Number} Count of currently running tasks | |
get running(){ | |
return this[PRIVATE].running; | |
} | |
// @returns {Number} Maximum number of tasks allowed to run concurrently | |
get concurrent(){ | |
return this[PRIVATE].limit; | |
} | |
} | |
// Finalize stream state (called once all transforms have completed) | |
function final_flush(){ | |
const flush_cb = this[PRIVATE].flush_cb; | |
delete this[PRIVATE].flush_cb; | |
this[PRIVATE].user_flush(flush_cb); | |
} | |
// Manage lifecycle of chunk transform invocation | |
function start_transform( chunk, encoding, transform_started ){ | |
const priv = this[PRIVATE]; | |
priv.running++; | |
transform_started(); | |
return priv.user_transform(chunk, encoding, (err, result) => { | |
priv.running--; | |
if( err ) return this.emit('error', err); | |
if( result !== undefined ) this.push(result); | |
if( priv.queued_transform ){ | |
const [next_chunk, next_encoding, next_transform_started] = priv.queued_transform; | |
delete priv.queued_transform; | |
priv.start_transform(next_chunk, next_encoding, next_transform_started); | |
} | |
else if( priv.flush_cb && !priv.running ) final_flush.call(this); | |
}); | |
} | |
// Create a stream via through2()-compatible interface | |
ConcurrentTransform.through = (...args) => { | |
let opts; | |
if( typeof args[0] === 'function' ) opts = {transform: args[0], flush: args[1]}; | |
else opts = Object.assign({transform: args[1], flush: args[2]}, args[0]); | |
return new ConcurrentTransform(opts); | |
}; | |
// Create a stream via through2::obj()-compatible interface | |
ConcurrentTransform.through.obj = (...args) => { | |
const opts = {objectMode: true}; | |
if( typeof args[0] !== 'function' ) Object.assign(opts, args.shift()); | |
return ConcurrentTransform.through(opts, ...args); | |
}; | |
module.exports = ConcurrentTransform; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment