Skip to content

Instantly share code, notes, and snippets.

@camilleriluke
Created July 2, 2018 15:59
Show Gist options
  • Save camilleriluke/493463aa9e3ebb4d5c8d45e21434fb71 to your computer and use it in GitHub Desktop.
Save camilleriluke/493463aa9e3ebb4d5c8d45e21434fb71 to your computer and use it in GitHub Desktop.
rx-test
import { concat, defer, empty, of } from "rxjs";
import { flatMap, reduce, scan, take } from "rxjs/operators";
const fetchPage = (pageNumber = 0) =>
new Promise(r => {
setTimeout(() => {
// console.log("fetching page ", pageNumber);
if (pageNumber >= 5) {
return r({ results: "done" });
}
r({ nextPage: pageNumber + 1, results: [pageNumber, pageNumber] });
}, 100 * (pageNumber % 2 === 0 ? 0 : pageNumber));
});
test.only("rx", async done => {
const fetchPageObservable = (x = 0) =>
defer(() => {
return fetchPage(x);
});
const collect = (acc, v, i) => acc.concat(v);
const streamNextPage = flatMap((page, i) => {
const records = of(page.results);
const nextPageObservable = page.nextPage
? fetchPageObservable(page.nextPage)
: empty();
return concat(records, nextPageObservable.pipe(streamNextPage));
});
fetchPageObservable()
.pipe(
streamNextPage,
take(12),
scan((acc, n) => {
console.log(acc);
return n;
}),
reduce(collect, [])
)
.subscribe((...args) => {
console.log("done", ...args);
done();
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment