Skip to content

Instantly share code, notes, and snippets.

@elbakramer
Last active December 13, 2018 12:15
Show Gist options
  • Save elbakramer/d4ef74292d553da728b37c9e1ec3dff6 to your computer and use it in GitHub Desktop.
Save elbakramer/d4ef74292d553da728b37c9e1ec3dff6 to your computer and use it in GitHub Desktop.
Convert Observable to AsyncIterator in Typescript
import { Observable, Observer } from 'rxjs';
import { interval } from 'rxjs';
import { take, tap, map } from 'rxjs/operators';
function observableToAsyncIterator<T>(observable: Observable<T>): AsyncIterator<T> {
const pushes = [];
const pulls = [];
function createPromiseWithExecutor() {
let executor = {
resolve: undefined,
reject: undefined,
};
let promise = new Promise((resolve, reject) => {
executor.resolve = resolve;
executor.reject = reject;
});
return { promise, executor };
}
function nextExecutor() {
if (pulls.length > 0) {
return pulls.shift();
} else {
let { promise, executor } = createPromiseWithExecutor();
pushes.push(promise);
return executor;
}
}
function onObserverNext(value) {
// console.log('on observer next:', value);
return nextExecutor().resolve({ value: value, done: false });
}
function onObserverComplete() {
// console.log('on observer complete');
return nextExecutor().resolve({ value: undefined, done: true });
}
function onObserverError(err) {
// console.log('on observer error', err);
return nextExecutor().reject(err);
}
const observer: Observer<T> = {
next(value) {
onObserverNext(value);
},
complete() {
onObserverComplete();
},
error(err) {
onObserverError(err);
}
};
const subscription = observable.subscribe(observer);
let done = false;
function onIteratorNext(value) {
// console.log('on iterator next:', value);
if (done) {
return Promise.resolve({ value: undefined, done: done });
} else if (pushes.length > 0) {
return pushes.shift();
} else if (!subscription.closed) {
let { promise, executor } = createPromiseWithExecutor();
pulls.push(executor);
return promise;
} else {
done = true;
return Promise.resolve({ value: undefined, done: done });
}
}
function onIteratorReturn(value) {
// console.log('on iterator return:', value);
if (!subscription.closed) {
subscription.unsubscribe();
}
done = true;
return Promise.resolve({ value: value, done: done });
}
function onIteratorThrow(err) {
// console.log('on iterator throw:', err);
if (!subscription.closed) {
subscription.unsubscribe();
}
done = true;
return Promise.reject(err);
}
const iterator: AsyncIterator<T> = {
next(value) {
return onIteratorNext(value);
},
return(value) {
return onIteratorReturn(value);
},
throw(err) {
return onIteratorThrow(err);
}
};
return iterator;
}
function toAsyncIterator() {
return observableToAsyncIterator(this);
}
Observable.prototype[Symbol.asyncIterator] = toAsyncIterator;
async function* asyncIteratorRange(n) {
for (let i of Array(n).keys()) {
console.log('inside iterator:', i);
yield i;
}
}
async function* asyncIteratorThatThrowsAfterZero(n) {
for (let i of Array(n).keys()) {
console.log('inside iterator:', i);
if (i > 0) {
throw i;
}
yield i;
}
}
function observableRange(n) {
return interval(50).pipe(tap(i => {
console.log('inside observable:', i);
})).pipe(take(n));
}
function observableThatThrowsAfterZero(n) {
return interval(50).pipe(map(i => {
console.log('inside observable:', i);
if (i > 0) {
throw i;
}
return i;
})).pipe(take(n));
}
// tests basic functionality
async function consumeSimple(iterable) {
for await (const item of iterable) {
console.log('inside consume:', item);
}
console.log('done');
}
// tests basic error handling
async function consumeCatch(iterable) {
try {
for await (const item of iterable) {
console.log('inside consume:', item);
}
} catch(e) {
console.log('inside consume (error):', e);
}
console.log('done');
}
// tests iterator return behavior
async function consumeBreak(iterable) {
for await (const item of iterable) {
console.log('inside consume:', item);
if (item > 0) {
break;
}
}
console.log('done');
}
// tests iterator throw behavior
async function consumeThrow(iterable) {
try {
for await (const item of iterable) {
console.log('inside consume:', item);
if (item > 0) {
throw item;
}
}
} catch(e) {
console.log('inside consume (error):', e);
}
console.log('done');
}
// tests complex behavior
async function consumeComplexThrowReturn(iterable) {
let it = iterable[Symbol.asyncIterator]();
console.log(await it.next());
console.log(await it.next());
console.log(await it.next(true));
console.log(await it.next());
console.log(await it.next());
try {
console.log(await it.throw(true));
} catch(e) {
console.log(e);
}
console.log(await it.next());
console.log(await it.next());
console.log(await it.return(true));
console.log(await it.next());
console.log(await it.next());
}
// tests complex behavior
async function consumeComplexReturn(iterable) {
let it = iterable[Symbol.asyncIterator]();
console.log(await it.next());
console.log(await it.next());
console.log(await it.next(true));
console.log(await it.next());
console.log(await it.next());
console.log(await it.return(true));
console.log(await it.next());
console.log(await it.next());
}
async function main() {
console.log('async iterator simple start');
await consumeSimple(asyncIteratorRange(10));
console.log('async iterator simple end');
console.log('observable simple start');
await consumeSimple(observableRange(10));
console.log('observable simple end');
console.log('async iterator catch start');
await consumeCatch(asyncIteratorThatThrowsAfterZero(10));
console.log('async iterator catch end');
console.log('observable catch start');
await consumeCatch(observableThatThrowsAfterZero(10));
console.log('observable catch end');
console.log('async iterator break start');
await consumeBreak(asyncIteratorRange(10));
console.log('async iterator break end');
console.log('observable break start');
await consumeBreak(observableRange(10));
console.log('observable break end');
console.log('async iterator throw start');
await consumeThrow(asyncIteratorRange(10));
console.log('async iterator throw end');
console.log('observable throw start');
await consumeThrow(observableRange(10));
console.log('observable throw end');
console.log('async iterator complex throw start');
await consumeComplexThrowReturn(asyncIteratorRange(10));
console.log('async iterator complex throw end');
console.log('observable complex throw start');
await consumeComplexThrowReturn(observableRange(10));
console.log('observable complex throw end');
console.log('async iterator complex return start');
await consumeComplexReturn(asyncIteratorRange(10));
console.log('async iterator complex return end');
console.log('observable complex return start');
await consumeComplexReturn(observableRange(10));
console.log('observable complex return start');
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment