Created
August 4, 2014 17:24
-
-
Save benjchristensen/970a9c02ac9423bfbb0c to your computer and use it in GitHub Desktop.
Multicasting a cold, infinite Observable and using onBackpressureBuffer/Drop to handle overflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.CountDownLatch; | |
import rx.Observable; | |
import rx.observables.ConnectableObservable; | |
import rx.schedulers.Schedulers; | |
/** | |
* This shows how a "reactive pull" compliant "cold" Observable, when multicasted, becomes "hot" and each Subscriber | |
* must then choose its strategy for overflow. | |
*/ | |
public class MulticastColdInfiniteBackpressureExample { | |
public static void main(String[] args) { | |
final CountDownLatch latch = new CountDownLatch(2); | |
// multicast a "cold" source | |
ConnectableObservable<Integer> source = getData(1).publish(); | |
/** | |
* This buffers so will get the first 2000 of 5000 emitted | |
*/ | |
source.onBackpressureBuffer().observeOn(Schedulers.computation()) | |
.map(i -> "one => " + i).take(2000).finallyDo(() -> latch.countDown()).forEach(System.out::println); | |
/** | |
* This drops, so will receive with first 1024 (size of internal buffer) and then pick up in the stream again | |
* when it can consume more and get large values like 159023. | |
*/ | |
source.onBackpressureDrop().observeOn(Schedulers.computation()) | |
.map(i -> "two => " + i).take(2000).finallyDo(() -> latch.countDown()).forEach(System.out::println); | |
source.connect(); | |
try { | |
latch.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* Not actually infinite, but large enough to behave such for this example. | |
*/ | |
public static Observable<Integer> getData(int id) { | |
return Observable.range(id, Integer.MAX_VALUE).subscribeOn(Schedulers.computation()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment