Last active
August 29, 2015 14:22
-
-
Save zenparsing/91852861fccd8a8ca956 to your computer and use it in GitHub Desktop.
mergeStreams with Async Iterators and Observables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async function* flattenStreams(asyncIterList) { | |
function deferred() { | |
let capability = {}; | |
capability.promise = new Promise((resolve, reject) => { | |
capability.resolve = resolve; | |
capability.reject = reject; | |
}); | |
return capability; | |
} | |
// We maintain a queue of promises for the received items | |
let top = deferred(), | |
queue = [top], | |
total = 0, | |
finished = 0, | |
cleanup = false; | |
for (let iter of asyncIterList) { | |
total += 1; | |
// Spawn a "worker" for each input stream | |
(async iter => { | |
try { | |
// For each item, resolve the promise at the back of the queue | |
// and push another unresolved promise. | |
for async (let x of iter) { | |
if (cleanup) return; | |
top.resolve(x); | |
queue.push(top = deferred()); | |
} | |
} catch (x) { | |
// Same thing for errors | |
top.reject(x); | |
queue.push(top = deferred()); | |
} finally { | |
finished += 1; | |
} | |
})(iter); | |
} | |
// Finally, yield everything that gets put into the queue until | |
// all input streams are finished | |
try { | |
while (finished < total || queue.length > 0) { | |
yield await queue.shift().promise; | |
} | |
} finally { | |
cleanup = true; | |
} | |
} | |
// === A little test case === | |
async function* g1() { | |
yield 1; | |
await 1; | |
yield 2; | |
} | |
async function* g2() { | |
yield 3; | |
await 1; | |
await 1; | |
yield 4; | |
} | |
async function* g3() { | |
await 1; | |
yield 5; | |
await 1; | |
yield 6; | |
await 1; | |
} | |
(async _=> { | |
for async (let x of flattenStreams([g1(), g2(), g3()])) { | |
console.log(x); | |
} | |
})(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function mergeStreams(observableList) { | |
return new Observable(sink => { | |
let cancelList = [], | |
count = 0; | |
for (let observable of observableList) { | |
count += 1; | |
let cancel = observable.subscribe({ | |
next(value) { return sink.next(value) }, | |
throw(value) { return sink.throw(value) }, | |
return() { | |
if (--count === 0) | |
sink.return(); | |
}, | |
}); | |
cancelList.push(cancel); | |
} | |
return _=> { | |
for (let cancel of cancelList) | |
cancel(); | |
}; | |
}); | |
} | |
let o1 = new Observable(sink => { | |
sink.next(1); | |
sink.next(2); | |
sink.return(); | |
}); | |
let o2 = new Observable(sink => { | |
sink.next(3); | |
sink.next(4); | |
sink.return(); | |
}); | |
let o3 = new Observable(sink => { | |
sink.next(5); | |
sink.next(6); | |
sink.return(); | |
}); | |
mergeStreams([o1, o2, o3]).forEach(x => { | |
console.log(x); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment