Last active
December 13, 2018 12:15
-
-
Save elbakramer/d4ef74292d553da728b37c9e1ec3dff6 to your computer and use it in GitHub Desktop.
Convert Observable to AsyncIterator in Typescript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Observer } from 'rxjs'; | |
import { interval } from 'rxjs'; | |
import { take, tap, map } from 'rxjs/operators'; | |
function observableToAsyncIterator<T>(observable: Observable<T>): AsyncIterator<T> { | |
const pushes = []; | |
const pulls = []; | |
function createPromiseWithExecutor() { | |
let executor = { | |
resolve: undefined, | |
reject: undefined, | |
}; | |
let promise = new Promise((resolve, reject) => { | |
executor.resolve = resolve; | |
executor.reject = reject; | |
}); | |
return { promise, executor }; | |
} | |
function nextExecutor() { | |
if (pulls.length > 0) { | |
return pulls.shift(); | |
} else { | |
let { promise, executor } = createPromiseWithExecutor(); | |
pushes.push(promise); | |
return executor; | |
} | |
} | |
function onObserverNext(value) { | |
// console.log('on observer next:', value); | |
return nextExecutor().resolve({ value: value, done: false }); | |
} | |
function onObserverComplete() { | |
// console.log('on observer complete'); | |
return nextExecutor().resolve({ value: undefined, done: true }); | |
} | |
function onObserverError(err) { | |
// console.log('on observer error', err); | |
return nextExecutor().reject(err); | |
} | |
const observer: Observer<T> = { | |
next(value) { | |
onObserverNext(value); | |
}, | |
complete() { | |
onObserverComplete(); | |
}, | |
error(err) { | |
onObserverError(err); | |
} | |
}; | |
const subscription = observable.subscribe(observer); | |
let done = false; | |
function onIteratorNext(value) { | |
// console.log('on iterator next:', value); | |
if (done) { | |
return Promise.resolve({ value: undefined, done: done }); | |
} else if (pushes.length > 0) { | |
return pushes.shift(); | |
} else if (!subscription.closed) { | |
let { promise, executor } = createPromiseWithExecutor(); | |
pulls.push(executor); | |
return promise; | |
} else { | |
done = true; | |
return Promise.resolve({ value: undefined, done: done }); | |
} | |
} | |
function onIteratorReturn(value) { | |
// console.log('on iterator return:', value); | |
if (!subscription.closed) { | |
subscription.unsubscribe(); | |
} | |
done = true; | |
return Promise.resolve({ value: value, done: done }); | |
} | |
function onIteratorThrow(err) { | |
// console.log('on iterator throw:', err); | |
if (!subscription.closed) { | |
subscription.unsubscribe(); | |
} | |
done = true; | |
return Promise.reject(err); | |
} | |
const iterator: AsyncIterator<T> = { | |
next(value) { | |
return onIteratorNext(value); | |
}, | |
return(value) { | |
return onIteratorReturn(value); | |
}, | |
throw(err) { | |
return onIteratorThrow(err); | |
} | |
}; | |
return iterator; | |
} | |
function toAsyncIterator() { | |
return observableToAsyncIterator(this); | |
} | |
Observable.prototype[Symbol.asyncIterator] = toAsyncIterator; | |
async function* asyncIteratorRange(n) { | |
for (let i of Array(n).keys()) { | |
console.log('inside iterator:', i); | |
yield i; | |
} | |
} | |
async function* asyncIteratorThatThrowsAfterZero(n) { | |
for (let i of Array(n).keys()) { | |
console.log('inside iterator:', i); | |
if (i > 0) { | |
throw i; | |
} | |
yield i; | |
} | |
} | |
function observableRange(n) { | |
return interval(50).pipe(tap(i => { | |
console.log('inside observable:', i); | |
})).pipe(take(n)); | |
} | |
function observableThatThrowsAfterZero(n) { | |
return interval(50).pipe(map(i => { | |
console.log('inside observable:', i); | |
if (i > 0) { | |
throw i; | |
} | |
return i; | |
})).pipe(take(n)); | |
} | |
// tests basic functionality | |
async function consumeSimple(iterable) { | |
for await (const item of iterable) { | |
console.log('inside consume:', item); | |
} | |
console.log('done'); | |
} | |
// tests basic error handling | |
async function consumeCatch(iterable) { | |
try { | |
for await (const item of iterable) { | |
console.log('inside consume:', item); | |
} | |
} catch(e) { | |
console.log('inside consume (error):', e); | |
} | |
console.log('done'); | |
} | |
// tests iterator return behavior | |
async function consumeBreak(iterable) { | |
for await (const item of iterable) { | |
console.log('inside consume:', item); | |
if (item > 0) { | |
break; | |
} | |
} | |
console.log('done'); | |
} | |
// tests iterator throw behavior | |
async function consumeThrow(iterable) { | |
try { | |
for await (const item of iterable) { | |
console.log('inside consume:', item); | |
if (item > 0) { | |
throw item; | |
} | |
} | |
} catch(e) { | |
console.log('inside consume (error):', e); | |
} | |
console.log('done'); | |
} | |
// tests complex behavior | |
async function consumeComplexThrowReturn(iterable) { | |
let it = iterable[Symbol.asyncIterator](); | |
console.log(await it.next()); | |
console.log(await it.next()); | |
console.log(await it.next(true)); | |
console.log(await it.next()); | |
console.log(await it.next()); | |
try { | |
console.log(await it.throw(true)); | |
} catch(e) { | |
console.log(e); | |
} | |
console.log(await it.next()); | |
console.log(await it.next()); | |
console.log(await it.return(true)); | |
console.log(await it.next()); | |
console.log(await it.next()); | |
} | |
// tests complex behavior | |
async function consumeComplexReturn(iterable) { | |
let it = iterable[Symbol.asyncIterator](); | |
console.log(await it.next()); | |
console.log(await it.next()); | |
console.log(await it.next(true)); | |
console.log(await it.next()); | |
console.log(await it.next()); | |
console.log(await it.return(true)); | |
console.log(await it.next()); | |
console.log(await it.next()); | |
} | |
async function main() { | |
console.log('async iterator simple start'); | |
await consumeSimple(asyncIteratorRange(10)); | |
console.log('async iterator simple end'); | |
console.log('observable simple start'); | |
await consumeSimple(observableRange(10)); | |
console.log('observable simple end'); | |
console.log('async iterator catch start'); | |
await consumeCatch(asyncIteratorThatThrowsAfterZero(10)); | |
console.log('async iterator catch end'); | |
console.log('observable catch start'); | |
await consumeCatch(observableThatThrowsAfterZero(10)); | |
console.log('observable catch end'); | |
console.log('async iterator break start'); | |
await consumeBreak(asyncIteratorRange(10)); | |
console.log('async iterator break end'); | |
console.log('observable break start'); | |
await consumeBreak(observableRange(10)); | |
console.log('observable break end'); | |
console.log('async iterator throw start'); | |
await consumeThrow(asyncIteratorRange(10)); | |
console.log('async iterator throw end'); | |
console.log('observable throw start'); | |
await consumeThrow(observableRange(10)); | |
console.log('observable throw end'); | |
console.log('async iterator complex throw start'); | |
await consumeComplexThrowReturn(asyncIteratorRange(10)); | |
console.log('async iterator complex throw end'); | |
console.log('observable complex throw start'); | |
await consumeComplexThrowReturn(observableRange(10)); | |
console.log('observable complex throw end'); | |
console.log('async iterator complex return start'); | |
await consumeComplexReturn(asyncIteratorRange(10)); | |
console.log('async iterator complex return end'); | |
console.log('observable complex return start'); | |
await consumeComplexReturn(observableRange(10)); | |
console.log('observable complex return start'); | |
} | |
main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment