Last active
April 7, 2016 23:28
-
-
Save Dorus/71c8f954e9e9e13dc0183f0197279408 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package RxTest.RxTest; | |
import java.io.IOException; | |
import java.util.concurrent.TimeUnit; | |
import rx.Notification; | |
import rx.Observable; | |
import rx.functions.Action1; | |
import rx.functions.Func1; | |
public class delaySub { | |
private long delay = 0; | |
private Observable<Long> getSubDelayObs() { | |
return Observable.just(delay).delaySubscription(() -> getDelayObs()); | |
} | |
Long start = System.currentTimeMillis(); | |
private Observable<Long> getDelayObs() { | |
final long del = delay++; | |
return Observable.timer(delay * del, TimeUnit.SECONDS) | |
.doOnSubscribe(() -> System.out.println("starting " + del + " " + (System.currentTimeMillis() - start))) | |
.doOnCompleted(() -> System.out.println("emit " + del + " " + (System.currentTimeMillis() - start))); | |
} | |
public Observable<String> m1() { | |
return Observable | |
.mergeDelayError( | |
getSubDelayObs().doOnEach(printAction("1a")).flatMap(getResult()).doOnEach(printAction1("1b")).retry(), | |
getSubDelayObs().doOnEach(printAction("2a")).flatMap(getResult()).doOnEach(printAction1("2b")).retry(), | |
getSubDelayObs().doOnEach(printAction("3a")).flatMap(getResult()).doOnEach(printAction1("3b")).retry()) | |
.doOnEach(printAction1("4")); | |
} | |
private Action1<Notification<? super Long>> printAction(String name) { | |
return action -> { | |
if (action.isOnCompleted()) { | |
System.out.println(name + " Completed"); | |
} | |
if (action.isOnError()) { | |
System.out.println(name + " Error " + action.getThrowable().getMessage()); | |
} | |
if (action.isOnNext()) { | |
System.out.println(name + " Next" + action.getValue()); | |
} | |
}; | |
} | |
private Action1<Notification<? super String>> printAction1(String name) { | |
return action -> { | |
if (action.isOnCompleted()) { | |
System.out.println(name + " Completed"); | |
} | |
if (action.isOnError()) { | |
System.out.println(name + " Error " + action.getThrowable().getMessage()); | |
} | |
if (action.isOnNext()) { | |
System.out.println(name + " Next" + action.getValue()); | |
} | |
}; | |
} | |
private int count = 0; | |
private Func1<Long, Observable<String>> getResult() { | |
return e -> { | |
int cur = count++; | |
switch (cur) { | |
case 0: | |
case 1: | |
case 2: | |
return Observable.error(new Exception("Something wrong!")); | |
case 3: | |
return Observable.just("Sucess").delay(1, TimeUnit.SECONDS); | |
default: | |
return Observable.just("Done!"); | |
} | |
}; | |
} | |
public static void main(String[] args) throws IOException { | |
new delaySub().m1().take(1).subscribe(e -> System.out.println(e), err -> System.out.println(err.getMessage()), | |
() -> System.out.println("Completed")); | |
System.in.read(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment