Last active
February 22, 2017 17:23
-
-
Save benjchristensen/9ac4c43737006670872a to your computer and use it in GitHub Desktop.
Handling a hot Observable producing faster than the Subscriber with onBackpressureDrop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
/** | |
* This demonstrates how to use onBackpressureDrop when a hot stream doesn't itself handle "reactive pull" | |
* | |
*/ | |
public class ReactivePullHotOnBackpressureDrop { | |
public static void main(String[] args) { | |
hotStream().onBackpressureDrop() // without this it will receive a MissingBackpressureException | |
.observeOn(Schedulers.computation()) | |
.map(ReactivePullHotOnBackpressureDrop::doExpensiveWork) | |
.toBlocking().forEach(System.out::println); | |
} | |
/** | |
* Simulate a "slow consumer" doing expensive work, slowing that the producer is emitting. | |
*/ | |
public static int doExpensiveWork(int i) { | |
try { | |
Thread.sleep(100); | |
} catch (InterruptedException e) { | |
// do nothing | |
} | |
return i; | |
} | |
/** | |
* This is an artificial source to demonstrate an infinite stream that emits randomly | |
*/ | |
/** | |
* This is an artificial source to demonstrate an infinite stream that emits randomly | |
*/ | |
public static Observable<Integer> hotStream() { | |
return Observable.create((Subscriber<? super Integer> s) -> { | |
int i = 0; | |
while (!s.isUnsubscribed()) { | |
s.onNext(i++); | |
try { | |
// sleep for a random amount of time | |
// NOTE: Only using Thread.sleep here as an artificial demo. | |
Thread.sleep((long) (Math.random() * 10)); | |
} catch (Exception e) { | |
// do nothing | |
} | |
} | |
}).subscribeOn(Schedulers.newThread()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@benjchristensen nice snippet of code very instructive. I ran this demo but I can't understand how rxjava decides when to apply the backpressure operator. In theory I would have thought since it is sleeping for 100ms while processing, it would process 1 element, then skip to element ~20 , ~40, etc. but when I run it the drop occurs somewhere around element 2800. which would mean to me there is some sort of implicit queue of this length created. Do I have this correct? Is there a way of controlling when the backpressure operators are applied?