Created
December 6, 2019 23:58
-
-
Save tzachz/da27f44929dd2103de6eb9c669fbab2e to your computer and use it in GitHub Desktop.
ExponentialBackoffRetry.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.tzachz.retry | |
import akka.actor.ActorSystem | |
import akka.pattern.Patterns.after | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.reflect.ClassTag | |
/** | |
* Created by tzachz on 12/6/19 | |
*/ | |
trait ExponentialBackoffRetry { | |
implicit val ec: ExecutionContext | |
implicit val actorSystem: ActorSystem | |
/**** | |
* Attempts to run "request" as long as it fails with an exception of type "RE" for which "isRecoverable" is true | |
* @param isRecoverable - checks whether the failure should be retried | |
* @param request - action to run | |
* @param config - initial delay and factor configuration for exponential retry | |
* @tparam T - type return from request | |
* @tparam RE - expected type of recoverable exceptions | |
* @return Future[T] with either a successful result or a non-recoverable failure | |
*/ | |
def withRetries[T, RE <: Throwable : ClassTag](isRecoverable: RE => Boolean) | |
(request: () => Future[T]) | |
(implicit config: ExponentialBackoffConfig): Future[T] = { | |
def iteration(currentDelay: FiniteDuration, attempt: Int): Future[T] = request.apply().recoverWith { | |
case e: RE if isRecoverable(e) => | |
val next: FiniteDuration = if (attempt == 0) currentDelay else currentDelay * config.factor | |
logger.warn(s"Recoverable failure detected; Attempt = $attempt; Retrying in $next...", e) | |
after(next, actorSystem.scheduler, ec, () => iteration(next, attempt + 1)) | |
} | |
iteration(config.initialDelay, 0) | |
} | |
} | |
case class ExponentialBackoffConfig(initialDelay: FiniteDuration = 1.second, factor: Long = 2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.tzachz.retry | |
import java.util.concurrent.Executors.newCachedThreadPool | |
import akka.actor.ActorSystem | |
import scala.concurrent.ExecutionContext.fromExecutor | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
import scala.util.Random | |
case class NetworkException(code: Int) extends Throwable | |
class Service extends ExponentialBackoffRetry { | |
override implicit val ec: ExecutionContext = fromExecutor(newCachedThreadPool()) | |
override implicit val actorSystem: ActorSystem = ActorSystem(this.getClass.getSimpleName) | |
implicit val config: ExponentialBackoffConfig = ExponentialBackoffConfig(initialDelay = 10.millis) | |
val isRecoverable: NetworkException => Boolean = _.code == 500 // let's say only 500 errors are recoverable | |
def makeRequest(): Future[String] = withRetries(isRecoverable) { | |
() => networkRequest() | |
} | |
def networkRequest(): Future[String] = { | |
val random = Random.nextFloat * 100 | |
if (random < 10) // low prob (10%): unrecoverable error | |
Future.failed(NetworkException(code = 400)) | |
else if (random < 80) // high prob (70%): recoverable error | |
Future.failed(NetworkException(code = 500)) | |
else // low prob (20%): success | |
Future.successful("<data>") | |
} | |
} | |
// try running this and see what happens :) | |
println(Await.ready(new Service().makeRequest(), 10.seconds)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment