One-to-many stream propogation inspired by nsqio/nsq#307 (comment)
'use strict'
{Transform} = require 'stream'
class Mux extends TransformEnabling object-mode allow us to safely send JavaScript objects via stream, please see http://nodejs.org/api/stream.html#stream_state_objects for detail.
constructor: ->
super
@_writableState.objectMode = true
@_readableState.objectMode = true
@_queue = []-
@param {Writable Stream} writable
-
@api public
-
@return {Mux} this
enqueue: (writable) => @_queue.push writable @emit 'enqueue' @
-
@param {Mixed} data
-
@param {String} encoding
-
@param {Function} callback
-
@api private
_transform: (data, enc, done) => writable = @_queue.pop()
writable streams are removed from queue in case of error, otherwise it is enqueued after fully consumes the data.
writable.write data, (error) =>
@enqueue writable unless error?immediately proceed to the next data if we have writable in queue. Backpressure is properly relayed to the upstream via done.
if @_queue.length > 0
do done
else
@once 'enqueue', ->
do done