merge = (channels, into) ->
out = into ? chan()
sem = channels.length
results = []
add = (ch) ->
until final value = yield receive ch
yield send out, value
results.push value
if --sem is 0 then out.close results
proc add ch for ch in channels
out
Manual generator version of above generator function:
class Merger
constructor: (channels, into) ->
@output = into ? chan()
@semaphore = channels.length
@results = []
proc new MergeGenerator this, ch for ch in channels
class MergeGenerator extends Generator
constructor: (@merger, @channel) -> super
next: (value) -> loop then switch ++@_step
when 1
return @yield receive @channel
when 2
if final()
@merger.results.push value
if --@merger.semaphore is 0
@merger.output.close @merger.results
return @return()
else
@_step = 0
return @yield send @merger.output, value
merge = (channels, into) -> (new Merger channels, into).output
Would require new signature for proc
: (options, generator, args).
Takes optional finishers
channel to which the input channels
are sent in the order they become done.
Review and confirm proper use of final
in context of yield select
.
merge = (channels, outbuffer, finishers) -> proc
out: chan outbuffer
->
inputs = channels.slice()
while inputs.length > 0
if final {value, channel} = yield from select inputs...
if finishers? then send.async finishers, channel
for ch, index in inputs when ch is channel
inputs.splice index, 1
break
else
yield send value
finishers