Created
January 18, 2018 13:34
-
-
Save chitacan/6d10835ee00697d090ceb44b4327058d to your computer and use it in GitHub Desktop.
rxjs back pressure https://github.com/ReactiveX/rxjs/issues/71#issuecomment-228824763
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this behavior subject is basically your "give me the next batch" mechanism. | |
// in this example, we're going to make 5 async requests back to back before requesting more. | |
const BATCH_SIZE = 5; | |
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items | |
// for every request, pump out a stream of events that represent how many you have left to fulfill | |
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1)) | |
// then concat map that into an observable of what you want to control with backpressure | |
// you might have some parameterization here you need to handle, this example is simplified | |
// handle side effects with a `do` block | |
.concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining) | |
// narrow it down to when there are no more left to request, | |
// and pump another batch request into the BehaviorSubject | |
.filter(remaining => remaining === 0) | |
.mapTo(BATCH_SIZE) | |
.subscribe(requests); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment