-
-
Save MichaelEvans/4422070f37f4b49b011a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @param intervalMs The base interval to start backing off from. The function is: attemptNum^2 * intervalMs | |
* @param retryAttempts The max number of attempts to retry this task or -1 to try MAX_INT times, | |
*/ | |
public static <T> Observable.Transformer<T, T> backoff(final long intervalMs, final int retryAttempts) { | |
return new Observable.Transformer<T, T>() { | |
@Override | |
public Observable<T> call(final Observable<T> observable) { | |
return observable.retryWhen( | |
retryFunc(intervalMs, retryAttempts), | |
Schedulers.immediate() | |
); | |
} | |
}; | |
} | |
private static Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> retryFunc(final long ms, final int attempts) { | |
return new Func1<Observable<? extends Throwable>, Observable<Long>>() { | |
@Override | |
public Observable<Long> call(Observable<? extends Throwable> observable) { | |
// zip our number of retries to the incoming errors so that we only produce retries | |
// when there's been an error | |
return observable.zipWith( | |
Observable.range(1, attempts > 0 ? attempts : Integer.MAX_VALUE), | |
new Func2<Throwable, Integer, Integer>() { | |
@Override | |
public Integer call(Throwable throwable, Integer attemptNumber) { | |
return attemptNumber; | |
} | |
}) | |
// flatMap the int attempt number to a timer that will wait the specified delay | |
.flatMap(new Func1<Integer, Observable<Long>>() { | |
@Override | |
public Observable<Long> call(final Integer integer) { | |
long newInterval = ms * ((long) integer * (long) integer); | |
if (newInterval < 0) { | |
newInterval = Long.MAX_VALUE; | |
} | |
// use Schedulers#immediate() to keep on same thread | |
return Observable.timer(newInterval, TimeUnit.MILLISECONDS, Schedulers.immediate()); | |
} | |
}); | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment