-
-
Save ZacSweers/90f49486a73f8cf9a500 to your computer and use it in GitHub Desktop.
Exponential Backoff Transformer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @param interval The base interval to start backing off from. The function is: attemptNum^2 * intervalTime | |
* @param units The units for interval | |
* @param retryAttempts The max number of attempts to retry this task or -1 to try MAX_INT times, | |
*/ | |
public static <T> Observable.Transformer<T, T> backoff(final long interval, final TimeUnit units, final int retryAttempts) { | |
return new Observable.Transformer<T, T>() { | |
@Override | |
public Observable<T> call(final Observable<T> observable) { | |
return observable.retryWhen( | |
retryFunc(interval, units, retryAttempts), | |
Schedulers.immediate() | |
); | |
} | |
}; | |
} | |
private static Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> retryFunc(final long interval, final TimeUnit units, final int attempts) { | |
return new Func1<Observable<? extends Throwable>, Observable<Long>>() { | |
@Override | |
public Observable<Long> call(Observable<? extends Throwable> observable) { | |
// zip our number of retries to the incoming errors so that we only produce retries | |
// when there's been an error | |
return observable.zipWith( | |
Observable.range(1, attempts > 0 ? attempts : Integer.MAX_VALUE), | |
new Func2<Throwable, Integer, Integer>() { | |
@Override | |
public Integer call(Throwable throwable, Integer attemptNumber) { | |
return attemptNumber; | |
} | |
}) | |
// flatMap the int attempt number to a timer that will wait the specified delay | |
.flatMap(new Func1<Integer, Observable<Long>>() { | |
@Override | |
public Observable<Long> call(final Integer integer) { | |
long newInterval = interval * ((long) integer * (long) integer); | |
if (newInterval < 0) { | |
newInterval = Long.MAX_VALUE; | |
} | |
// use Schedulers#immediate() to keep on same thread | |
return Observable.timer(newInterval, units, Schedulers.immediate()); | |
} | |
}); | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment