-
-
Save CrabDude/1629758 to your computer and use it in GitHub Desktop.
// Sometimes we need to handle stream results serially... | |
// The question is, is there a better / more obvious way to do this? | |
var fs = require('fs'), | |
streamer = require('streamer'); | |
function serialize(source) { | |
var queue = []; | |
return function stream(next, stop) { | |
var processing = false, | |
completed = false; | |
source(function sourceNext(x) { | |
if (processing) { | |
queue.push(x); | |
return; | |
} | |
processing = true; | |
next(x, function() { | |
processing = false; | |
if (completed && !queue.length) { | |
stop(); | |
} else { | |
sourceNext(queue.shift()); | |
} | |
}); | |
}, function(err) { | |
if (err) return stop(err); | |
completed = true; | |
}); | |
}; | |
} | |
// a simple async list implementation | |
function asyncList(arr) { | |
return function stream(next, stop) { | |
var l = arr.length; | |
arr.forEach(function(v, k) { | |
setTimeout(function() { | |
next(v); | |
if (k===l-1) stop(); | |
}, 50); | |
}); | |
}; | |
} | |
serialize(asyncList([0,1,2,3]))(function(x, next){ | |
console.log('wait: ',x); | |
setTimeout(function() { | |
console.log('continue: ',x); | |
next(); | |
}, 50); | |
}, function() { | |
console.log('completed!'); | |
}); |
@Gozala Ahh, interesting. Thanks for the reply; I'll have to take a look.
Have you seen node-lazy? It's an event emitter / node.js Stream based approach to lazy streams. It's more familiar, but being non-functional, it would seem it's not "trivially parallizeable" as MapReduce / no-side-effect functional solution is. Also though, aside from the issue in the gist, IMHO streamer feels more elegant.
@CrabDude To be fully honest, back when I started with streamer I did not knew there was anything similar. Later people pointed me out node-lazy and streamjs which I looked at, but quickly rejected mainly because of method based approach instead of functional. At this point I'm no longer sure that function based approach was indeed better as I experiment with prototype based solution myself. Once I'll get more confidence with best possible solution, I'm planning to talking with both @pkrumins and @dionyziz about merging our efforts.
@CrabDude Yeah that was exactly why I decide to revisit streamer design. The fact that stream pushes values on you instead of you pulling from stream was kind of unpleasant in growing number of cases. So I decide to make streams lazy, which can be easily converted to greedy push streams. This work is currently under experimental/lazy branch and another version with alternative API in experimental/prototype. One of this two branches will most likely become master, so any feedback would be great!