Skip to content

Instantly share code, notes, and snippets.

@quickredfox
Created October 3, 2012 16:04
Show Gist options
  • Save quickredfox/3827846 to your computer and use it in GitHub Desktop.
Save quickredfox/3827846 to your computer and use it in GitHub Desktop.
Pipeline = ( objects )->
unless @ instanceof Pipeline then return new Pipeline( objects )
events.EventEmitter.call pipe = @
unless objects instanceof Array then objects = [ objects ]
pipe.objects = objects
pipe.filter = ( filters, callback )->
if typeof filters is 'function'
callback = filters
filters = null
pipe.metas = []
if callback
pipe.on 'error', ->
pipe.emit 'complete', 'error', arguments[0]
if callback then callback arguments[0]
pipe.on 'end', ->
pipe.emit 'complete', 'end', arguments[0]
if callback then callback null, arguments[0]
pipe.emit 'start'
if filters then FilterChain.setup( filters: filters )
Pipeline.filter( pipe )
return pipe
Pipeline.filter = ( pipe )->
chains = pipe.objects.map ( object )-> new FilterChain( object )
chains.forEach ( chain, index )->
['start','end','error','complete'].map ( event )->
chain.once event, ->
args = Array::slice.call( arguments )
args.unshift filter
args.unshift "#{event}.chain"
pipe.emit.apply( chain, args )
['start.filter','end.filter','error.filter','complete.filter'].map ( event )->
chain.once event, ->
args = Array::slice.call( arguments )
args.unshift filter
args.unshift "#{event.slice(0,event.indexOf('.'))}.chain.filter"
pipe.emit.apply( chain, args )
chain.once 'end', ( meta )->
pipes.metas[index]=meta
chain.once 'error', ( error )->
pipes.metas[index]={ error: error }
chain.once 'complete', ()->
if pipes.metas.length is pipes.objects.length
pipe.emit 'end', pipes.metas
Pipeline.setup = ( options )->
FilterChain.setup pollDelay: options.pollDelay||100, filters: options.filters||[]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment