Skip to content

Instantly share code, notes, and snippets.

@hoc081098
Created August 25, 2020 16:41
Show Gist options
  • Save hoc081098/9236958210c1648e5085ffda5e48480d to your computer and use it in GitHub Desktop.
Save hoc081098/9236958210c1648e5085ffda5e48480d to your computer and use it in GitHub Desktop.
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.pow
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.milliseconds
/**
* @param backoffDelay calculate delay on nth attempt
* @param predicate conditional retry.
* @param scheduler the [Scheduler] used in [Observable.timer].
*/
@ExperimentalTime
fun <T : Any> Observable<T>.retryWhenWithExponentialBackoff(
backoffDelay: (error: Throwable, attempt: Int) -> Single<Duration>,
predicate: (error: Throwable, attempt: Int) -> Single<Boolean>,
scheduler: Scheduler
): Observable<T> = retryWhen { errorObservable ->
errorObservable
.scan(null as Throwable? to -1) { acc, error -> error to acc.second + 1 }
.skip(1)
.flatMap { (error, attempt) ->
predicate(error!!, attempt)
.flatMapObservable { shouldRetry ->
if (shouldRetry) {
backoffDelay(error, attempt)
.map { error to it }
.toObservable()
} else {
Observable.just(error to null)
}
}
}
.flatMap { (error, delay) ->
if (delay != null) {
Observable.timer(delay.toLongMilliseconds(), TimeUnit.MILLISECONDS, scheduler)
} else {
Observable.error(error)
}
}
}
@ExperimentalTime
fun <T : Any> Observable<T>.retryWhenWithExponentialBackoff(
initialDelay: Duration,
factor: Double,
maxDelay: Duration,
maxRetries: Int,
predicate: (error: Throwable, attempt: Int) -> Boolean = { _, _ -> true }
): Observable<T> = retryWhenWithExponentialBackoff(
backoffDelay = { _, attempt ->
(initialDelay * factor.pow(attempt))
.coerceAtMost(maxDelay)
.also { println("${Date()}: Retry ${attempt + 1}, delay $it") }
.let { Single.just(it) }
},
predicate = { error, attempt -> Single.just(attempt < maxRetries && predicate(error, attempt)) },
scheduler = Schedulers.computation()
)
@ExperimentalTime
fun main() {
val count = AtomicInteger(1)
val expected = 5
Observable
.defer {
println("${Date()}: Execute: $count")
if (count.getAndIncrement() == expected) {
Observable
.timer(500, TimeUnit.MILLISECONDS)
.map { Random.nextInt() }
} else {
error("Unexpected error!")
}
}
.retryWhenWithExponentialBackoff(
initialDelay = 1_000.milliseconds,
factor = 2.0,
maxDelay = Duration.INFINITE,
maxRetries = 3
)
.subscribeBy(
onNext = { println("${Date()}: >> NEXT $it") },
onError = { println("${Date()}: >> ERROR $it") }
)
Thread.sleep(10_000)
}
@hoangchungk53qx1
Copy link

bros

@hoc081098
Copy link
Author

@hoangchungk53qx1 chaof bajn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment