Last active
September 28, 2015 14:56
-
-
Save zenparsing/9ab5a930fdf43f47de77 to your computer and use it in GitHub Desktop.
Observable zip with Various Return Types
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function zip(list) { | |
return new Observable(sink => { | |
function trySend() { | |
// If every buffer has at least one element, then send an array | |
// containing every first element and remove those elements from the | |
// buffer | |
if (streams.every(s => s.buffer.length > 0)) | |
sink.next(streams.map(s => s.buffer.shift())); | |
tryComplete(); | |
} | |
function tryComplete() { | |
// If there is a stream which is completed and whose buffer is empty, | |
// then send a complete signal to the sink | |
if (streams.some(s => s.completed && s.buffer.length === 0)) | |
sink.complete(); | |
} | |
let streams = Array.from(list, observable => { | |
let stream = { buffer: [], cancel: null, completed: false }; | |
stream.cancel = observable.subscribe({ | |
next(v) { | |
if (stream.buffer.push(v) === 1) | |
trySend(); | |
}, | |
error(e) { | |
return sink.error(e); | |
}, | |
complete() { | |
stream.completed = true; | |
tryComplete(); | |
}, | |
}); | |
return stream; | |
}); | |
return _=> { | |
// On cleanup, cancel each subscription | |
for (let stream of streams) | |
stream.cancel(); | |
}; | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function zip(list) { | |
return new Observable(sink => { | |
function trySend() { | |
// If every buffer has at least one element, then send an array | |
// containing every first element and remove those elements from the | |
// buffer | |
if (streams.every(s => s.buffer.length > 0)) | |
sink.next(streams.map(s => s.buffer.shift())); | |
tryComplete(); | |
} | |
function tryComplete() { | |
// If there is a stream which is completed and whose buffer is empty, | |
// then send a complete signal to the sink | |
if (streams.some(s => s.subscription.isUnsubscribed() && s.buffer.length === 0)) | |
sink.complete(); | |
} | |
let streams = Array.from(list, observable => { | |
let stream = { buffer: [], subscription: null }; | |
stream.subscription = observable.subscribe({ | |
next(v) { | |
if (stream.buffer.push(v) === 1) | |
trySend(); | |
}, | |
error(e) { | |
return sink.error(e); | |
}, | |
complete() { | |
tryComplete(); | |
}, | |
}); | |
return stream; | |
}); | |
return _=> { | |
// On cleanup, cancel each subscription | |
for (let stream of streams) | |
stream.subscription.unsubscribe(); | |
}; | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment