Skip to content

Instantly share code, notes, and snippets.

@emlautarom1
Created January 29, 2022 01:05
Show Gist options
  • Save emlautarom1/7a8bda5be669bd917c4fe98b1effced2 to your computer and use it in GitHub Desktop.
Save emlautarom1/7a8bda5be669bd917c4fe98b1effced2 to your computer and use it in GitHub Desktop.
Depaginate & share infinite stream
import { defer, from, Observable, of } from 'rxjs';
import { take, expand, mergeMap, tap, concatWith } from 'rxjs/operators';
interface Result {
page: number;
items: number[];
}
// Assume that this does a real HTTP request
function fetchPage(page: number = 0): Observable<Result> {
let items = [...Array(3).keys()].map((e) => e + 3 * page);
return of({ page, items }).pipe(
tap(() => console.log('Fetching page ' + page))
);
}
function mkInfiniteStream(): Observable<number> {
let lastPage = 0;
let itemsRead = [];
return defer(() =>
from(itemsRead).pipe(
concatWith(
fetchPage(lastPage).pipe(
expand((res) => fetchPage(res.page + 1)),
tap((res) => {
lastPage++;
itemsRead = [...itemsRead, ...res.items];
}),
mergeMap((res) => from(res.items))
)
)
)
);
}
const infinite$ = mkInfiniteStream();
infinite$.pipe(take(10)).subscribe((v) => console.log('[1] got ', v));
// Assume that later we have new subscribers
setTimeout(() => {
infinite$.pipe(take(5)).subscribe((v) => console.log('[2] got ', v));
}, 1000);
setTimeout(() => {
infinite$.pipe(take(4)).subscribe((v) => console.log('[3] got ', v));
}, 1500);
setTimeout(() => {
infinite$.pipe(take(20)).subscribe((v) => console.log('[4] got ', v));
}, 2000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment