Skip to content

Instantly share code, notes, and snippets.

@zerobias
Created February 8, 2018 19:29
Show Gist options
  • Save zerobias/7b3b0aaa031cd53721567059c6104378 to your computer and use it in GitHub Desktop.
Save zerobias/7b3b0aaa031cd53721567059c6104378 to your computer and use it in GitHub Desktop.
//@flow
import type {Stream} from 'most'
function intoAsync<T>(str: Stream<T>): $AsyncIterable<T, void, any> {
let isEnd = false
let ask: Array<Dispose<T>> = []
let tell: Array<T> = []
const end = str.observe(t => {
if (ask.length > 0) {
const dispose = ask[0]
ask = ask.slice(1)
dispose.disposed(t)
} else {
tell.push(t)
}
})
end.then(() => isEnd = true)
return async function* Runner(): $AsyncIterable<T, void, any> {
while (!isEnd) {
if (tell.length > 0) {
const value = tell[0]
tell = tell.slice(1)
yield value
} else {
const dispose: Dispose<T> = new Dispose
ask.push(dispose)
const result = await dispose.done
yield result
}
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment