Skip to content

Instantly share code, notes, and snippets.

@SergejIsbrecht
Created September 21, 2020 11:52
Show Gist options
  • Save SergejIsbrecht/68fccabddff226dd5190b6a7e048f512 to your computer and use it in GitHub Desktop.
Save SergejIsbrecht/68fccabddff226dd5190b6a7e048f512 to your computer and use it in GitHub Desktop.
RxJava switchMap + observeOn/ subscribeOn
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import java.util.Arrays;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;
public class ObserveOnTest {
@Test
void name() throws Exception {
Scheduler appSched = Schedulers.from(Executors.newSingleThreadExecutor());
Scheduler ioSched = Schedulers.from(Executors.newSingleThreadExecutor());
ioSched.scheduleDirect(() -> System.out.println("first"));
Observable<Integer> t =
Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) //
.flatMap(integers -> Observable.just(integers).subscribeOn(appSched))
.subscribeOn(appSched)
.switchMap(
integer -> {
System.out.println("switch " + integer);
return x(ioSched, integer);
})
.observeOn(appSched);
TestObserver<Integer> test = t.test();
test.awaitTerminalEvent();
}
private Observable<Integer> x(Scheduler sched, int q) {
return Observable.fromCallable(
() -> {
System.out.println(q + " working " + Thread.currentThread().getName());
Thread.sleep(1_000);
System.out.println("done " + q);
return 42;
})
.doOnDispose(() -> System.out.println("dispose " + q))
.doOnEach(integerNotification -> System.out.println("q " + integerNotification.toString()))
.doOnSubscribe(disposable -> System.out.println("sub " + q + " - " + Thread.currentThread().getName()))
.subscribeOn(sched);
}
}
@SergejIsbrecht
Copy link
Author

Output may vary:

first
switch 1
switch 2
switch 3
switch 4
switch 5
switch 6
switch 7
switch 8
switch 9
switch 10
sub 2 - pool-3-thread-1
dispose 2
sub 10 - pool-3-thread-1
10 working pool-3-thread-1
done 10
q OnNextNotification[42]
q OnCompleteNotification

Process finished with exit code 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment