Created
February 4, 2015 22:40
-
-
Save DanielGrech/142f1d4c12824f8fa673 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.functions.Func0; | |
/** | |
* Wraps a source observable for creating a new observable | |
* on every subscribe. | |
* <p/> | |
* For use with {@link Observable#defer(rx.functions.Func0)} | |
*/ | |
public class DeferredObservable<T> implements Func0<Observable<T>> { | |
private final Observable<T> source; | |
public DeferredObservable(Observable<T> source) { | |
this.source = source; | |
} | |
@Override | |
public Observable<T> call() { | |
return source; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.functions.Func1; | |
import timber.log.Timber; | |
public class DelayFunction implements Func1<Integer, Observable<?>> { | |
@Override | |
public Observable<?> call(Integer delayInSeconds) { | |
Timber.v("Delaying observable by %s seconds", delayInSeconds); | |
return Observable.timer(delayInSeconds, TimeUnit.SECONDS); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sourceObservableToRetry | |
.compose(new ExponentialBackoffTransformer(3, new SecondsMultiplierBackoffStrategy(1)) | |
.subscribeOn(Schedulers.io()) | |
.observeOn(Schedulers.immediate()) | |
.subscribe(new Action1<>(){}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.functions.Func1; | |
import rx.functions.Func2; | |
public class ExponentialBackoffTransformer<T> implements Observable.Transformer<T, T> { | |
private final int attempts; | |
private final Func1<Integer, Integer> attemptToDelayMap; | |
public ExponentialBackoffTransformer(int attempts, Func1<Integer, Integer> attemptToDelayMap) { | |
this.attempts = attempts; | |
this.attemptToDelayMap = attemptToDelayMap; | |
} | |
public ExponentialBackoffTransformer(int attempts) { | |
this(attempts, ONE_SECOND_DELAY_PER_ATTEMPT_STRATEGY); | |
} | |
@Override | |
public Observable<T> call(final Observable<T> source) { | |
if (attempts < 1) { | |
return source; | |
} | |
return Observable.defer(new DeferredObservable<>(source)) | |
.retryWhen(getRetryHandler()); | |
} | |
private Func1<Observable<? extends Throwable>, Observable<?>> getRetryHandler() { | |
final Observable<Integer> attemptRange = Observable.range(1, attempts); | |
final Func2<Throwable, Integer, Integer> zipFunction | |
= new Func2<Throwable, Integer, Integer>() { | |
@Override | |
public Integer call(Throwable throwable, Integer attemptCount) { | |
return attemptCount; | |
} | |
}; | |
return new Func1<Observable<? extends Throwable>, Observable<?>>() { | |
@Override | |
public Observable<?> call(Observable<? extends Throwable> observable) { | |
return observable.zipWith(attemptRange, zipFunction) | |
.map(attemptToDelayMap) | |
.flatMap(new DelayFunction()); | |
} | |
}; | |
} | |
public static final Func1<Integer, Integer> ONE_SECOND_DELAY_PER_ATTEMPT_STRATEGY | |
= new SecondsMultiplierBackoffStrategy(1); | |
/** | |
* Backoff for a certain number of seconds multiplied by each attempt. | |
* | |
* Eg. If the multiplier is set to '3': | |
* | |
* <table> | |
* <tr> | |
* <td align="center">Attempt</td> | |
* <td align="center">Delay</td> | |
* </tr> | |
* <tr> | |
* <td align="center">1</td> | |
* <td align="center">3 seconds</td> | |
* </tr> | |
* <tr> | |
* <td align="center">2</td> | |
* <td align="center">6 seconds</td> | |
* </tr> | |
* <tr> | |
* <td align="center">3</td> | |
* <td align="center">9 seconds</td> | |
* </tr> | |
* <tr> | |
* <td align="center">4</td> | |
* <td align="center">12 seconds</td> | |
* </tr> | |
* </table> | |
* | |
* | |
*/ | |
public static class SecondsMultiplierBackoffStrategy implements Func1<Integer, Integer> { | |
private final int multiplierPerAttempt; | |
public SecondsMultiplierBackoffStrategy(int multiplierPerAttempt) { | |
this.multiplierPerAttempt = multiplierPerAttempt; | |
} | |
@Override | |
public Integer call(Integer attemptNumber) { | |
return attemptNumber * multiplierPerAttempt; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment