Skip to content

Instantly share code, notes, and snippets.

@jooyunghan
Last active December 20, 2017 16:27
Show Gist options
  • Save jooyunghan/81d93f04a00325c63e4476e70f9ced5e to your computer and use it in GitHub Desktop.
Save jooyunghan/81d93f04a00325c63e4476e70f9ced5e to your computer and use it in GitHub Desktop.
const { Observable } = require('rxjs');
Observable.prototype[Symbol.asyncIterator] = createAsyncIterator;
function createAsyncIterator() {
const promises = [];
const values = [];
let done = false;
let error = null;
const subscription = this.subscribe({
next(value) {
if (promises.length > 0) {
promises.shift().resolve({ value, done: false });
} else {
values.push(value);
}
},
complete() {
done = true;
promises.splice(0).forEach(p => p.resolve({ done }));
},
error(err) {
if (promises.length > 0) {
promises.shift().reject(err);
this.complete();
} else {
error = err;
}
}
});
return {
next() {
if (values.length > 0) {
return { value: values.shift(), done: false };
}
if (done) {
return { done };
}
if (error) {
const p = Promise.reject(error);
done = true;
return;
}
return new Promise((resolve, reject) => {
promises.push({ resolve, reject });
});
},
return() {
subscription.unsubscribe();
}
};
}
async function consume(obs) {
try {
for await (const n of obs) {
console.log(n);
}
} catch (e) {
console.log('got it', e.message);
}
console.log('done');
}
// async function consume2(obs) {
// const ai = obs[Symbol.asyncIterator]();
// ai.next().then(r => console.log(0, r), e => console.log(0, e.message));
// ai.next().then(r => console.log(1, r), e => console.log(1, e.message));
// ai.next().then(r => console.log(2, r), e => console.log(2, e.message));
// ai.next().then(r => console.log(3, r), e => console.log(3, e.message));
// }
consume(Observable.interval(100).take(10));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment