Skip to content

Instantly share code, notes, and snippets.

@truedem
Created May 27, 2017 10:03
Show Gist options
  • Select an option

  • Save truedem/b844f507235efcfa68f7a2491c624cd3 to your computer and use it in GitHub Desktop.

Select an option

Save truedem/b844f507235efcfa68f7a2491c624cd3 to your computer and use it in GitHub Desktop.
Throttle not first item in RxJava Observable
// https://github.com/ReactiveX/RxJava/issues/1323#issuecomment-45149897
// thank you, akarnokd !
public class ThrottleNotFirst {
public static void main(String[] args) throws Exception {
for (int firstWait = 0; firstWait < 2; firstWait++) {
PublishSubject<String> source = PublishSubject.create();
PublishSubject<Observable<String>> immediates = PublishSubject.create();
Observable<Observable<String>> first = source.take(1)
.observeOn(Schedulers.io())
.map(s ->
Observable.just("First " + s).delay(100, TimeUnit.MILLISECONDS));
Observable<Observable<String>> rest = source.skip(1)
.debounce(100, TimeUnit.MILLISECONDS, Schedulers.io())
.map(s ->
Observable.just("Rest " + s).delay(100, TimeUnit.MILLISECONDS));
Observable.merge(first, rest).subscribe(immediates);
Observable<String> result = Observable.switchOnNext(immediates);
result.subscribe(System.out::println,
Throwable::printStackTrace, () -> System.out.println("Done"));
System.out.println("Typed: One");
source.onNext("One");
if (firstWait == 1) {
System.out.println("Waiting after one");
Thread.sleep(300);
}
source.onNext("Two");
source.onNext("Three");
Thread.sleep(300);
System.out.println("-----");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment