Created
July 18, 2016 17:46
-
-
Save waliahimanshu/77083202877585473853d93ab02043a8 to your computer and use it in GitHub Desktop.
RxJava Fetching Observables In Parallel (without Parallel operator)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
//Reference example by benjchristensen | |
public class ParallelExecution { | |
public static void main(String[] args) { | |
System.out.println("------------ mergingAsync"); | |
mergingAsync(); | |
System.out.println("------------ mergingSync"); | |
mergingSync(); | |
System.out.println("------------ mergingSyncMadeAsync"); | |
mergingSyncMadeAsync(); | |
System.out.println("------------ flatMapExampleSync"); | |
flatMapExampleSync(); | |
System.out.println("------------ flatMapExampleAsync"); | |
flatMapExampleAsync(); | |
System.out.println("------------"); | |
} | |
private static void mergingAsync() { | |
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); | |
} | |
private static void mergingSync() { | |
// here you'll see the delay as each is executed synchronously | |
Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); | |
} | |
private static void mergingSyncMadeAsync() { | |
// if you have something synchronous and want to make it async, you can schedule it like this | |
// so here we see both executed concurrently | |
Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); | |
} | |
private static void flatMapExampleAsync() { | |
Observable.range(0, 5).flatMap(i -> { | |
return getDataAsync(i); | |
}).toBlocking().forEach(System.out::println); | |
} | |
private static void flatMapExampleSync() { | |
Observable.range(0, 5).flatMap(i -> { | |
return getDataSync(i); | |
}).toBlocking().forEach(System.out::println); | |
} | |
// artificial representations of IO work | |
static Observable<Integer> getDataAsync(int i) { | |
return getDataSync(i).subscribeOn(Schedulers.io()); | |
} | |
static Observable<Integer> getDataSync(int i) { | |
return Observable.create((Subscriber<? super Integer> s) -> { | |
// simulate latency | |
try { | |
Thread.sleep(1000); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
s.onNext(i); | |
s.onCompleted(); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment