Skip to content

Instantly share code, notes, and snippets.

@hden
Last active August 29, 2015 13:56
Show Gist options
  • Select an option

  • Save hden/8969136 to your computer and use it in GitHub Desktop.

Select an option

Save hden/8969136 to your computer and use it in GitHub Desktop.
One-to-many stream propogation inspired by https://github.com/bitly/nsq/issues/307#issuecomment-34694798

Fair Dispatch Pattern for Node.js Stream

One-to-many stream propogation inspired by nsqio/nsq#307 (comment)

'use strict'

{Transform} = require 'stream'

class Mux extends Transform

Enabling object-mode allow us to safely send JavaScript objects via stream, please see http://nodejs.org/api/stream.html#stream_state_objects for detail.

    constructor: ->
        super
        @_writableState.objectMode = true
        @_readableState.objectMode = true
        @_queue = []

Enqueue

  • @param {Writable Stream} writable

  • @api public

  • @return {Mux} this

      enqueue: (writable) =>
          @_queue.push writable
          @emit 'enqueue'
          @

Transforming-dispatch

  • @param {Mixed} data

  • @param {String} encoding

  • @param {Function} callback

  • @api private

      _transform: (data, enc, done) =>
          writable = @_queue.pop()

writable streams are removed from queue in case of error, otherwise it is enqueued after fully consumes the data.

        writable.write data, (error) =>
            @enqueue writable unless error?

immediately proceed to the next data if we have writable in queue. Backpressure is properly relayed to the upstream via done.

        if @_queue.length > 0
            do done
        else
            @once 'enqueue', ->
                do done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment