Created
March 30, 2019 02:23
-
-
Save lhr0909/94d437aea8967bf02a06ca912823ab91 to your computer and use it in GitHub Desktop.
Conversion between AsyncIterable (AsyncIterator) and Observable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function asyncIterableToObservable<T>(iterable: AsyncIterable<T>): Observable<T> { | |
return new Observable<T>( | |
(observer: Subscriber<T>) => | |
void (async () => { | |
try { | |
for await (const item of iterable) { | |
if (observer.closed) { | |
return; | |
} | |
observer.next(item); | |
} | |
observer.complete(); | |
} catch (e) { | |
observer.error(e); | |
} | |
})(), | |
); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// based on https://github.com/apollographql/graphql-subscriptions/blob/master/src/event-emitter-to-async-iterator.ts | |
function observableToAsyncIterable<T>( | |
observable: Observable<T>, | |
): AsyncIterable<T> { | |
const pullQueue: any[] = []; | |
const pushQueue: any[] = []; | |
let listening = true; | |
const pushValue = (value: T | undefined, done: boolean) => { | |
if (pullQueue.length !== 0) { | |
const resolve = pullQueue.shift(); | |
if (resolve) { | |
resolve({ value, done }); | |
} | |
} else { | |
pushQueue.push(value); | |
} | |
}; | |
const pullValue = () => { | |
return new Promise(resolve => { | |
if (pushQueue.length !== 0) { | |
resolve({ value: pushQueue.shift(), done: false }); | |
} else { | |
pullQueue.push(resolve); | |
} | |
}); | |
}; | |
const emptyQueue = () => { | |
if (listening) { | |
listening = false; | |
pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); | |
pullQueue.length = 0; | |
pushQueue.length = 0; | |
} | |
}; | |
const observer: Observer<T> = { | |
closed: false, | |
next(value) { | |
pushValue(value, this.closed); | |
}, | |
error(e) { | |
this.closed = true; | |
pushValue(e, this.closed); | |
}, | |
complete() { | |
this.closed = true; | |
pushValue(undefined, this.closed); | |
}, | |
}; | |
const subscription = observable.subscribe(observer); | |
return { | |
[Symbol.asyncIterator]() { | |
return { | |
async next() { | |
if (!listening) { | |
return this.return(); | |
} | |
const value = await pullValue(); | |
return value; | |
}, | |
return() { | |
emptyQueue(); | |
subscription.unsubscribe(); | |
return Promise.resolve({ value: {}, done: true } as IteratorResult<T>); | |
}, | |
throw(error) { | |
emptyQueue(); | |
subscription.unsubscribe(); | |
return Promise.reject(error); | |
}, | |
}; | |
}, | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment