Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active July 18, 2016 13:39
Show Gist options
  • Save atamborrino/93c76ed9e3c165feb13c to your computer and use it in GitHub Desktop.
Save atamborrino/93c76ed9e3c165feb13c to your computer and use it in GitHub Desktop.
package com.samsung.sami.kafka.utils
import akka.actor.Scheduler
import org.slf4j.Logger
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.control.NonFatal
case class TimeoutException(msg: String) extends RuntimeException(msg)
object FutureUtils {
def withTimeout[A](delay: FiniteDuration)
(fut: Future[A])
(implicit ec: ExecutionContext, scheduler: Scheduler, log: Logger): Future[A] = {
val delayed = akka.pattern.after(delay, scheduler) {
Future.failed(TimeoutException(s"Timeout after $delay"))
}
Future.firstCompletedOf(Seq(fut, delayed))
}
def retryWithBackoff[A](retryMinInterval: FiniteDuration,
retryMaxInterval: FiniteDuration,
keepTryingAtMaxInterval: Boolean,
logContext: String = "")
(fut: => Future[A])
(implicit scheduler: Scheduler, ec: ExecutionContext, log: Logger): Future[A] = {
def computeBackoff(currentNbRetries: Int): FiniteDuration =
(Math.pow(2, currentNbRetries) * retryMinInterval).asInstanceOf[FiniteDuration]
def retry(nbRetries: Int): Future[A] = {
fut.recoverWith {
case NonFatal(failure) =>
val newBackoff = computeBackoff(nbRetries)
if (newBackoff <= retryMaxInterval) {
log.warn(s"Failure with context: $logContext. Will retry with backoff $newBackoff", failure)
akka.pattern.after(newBackoff, scheduler) { retry(nbRetries + 1) }
} else if (keepTryingAtMaxInterval) {
log.error(s"Failure with context: $logContext. Will retry with backoff $newBackoff", failure)
akka.pattern.after(newBackoff, scheduler) { retry(nbRetries) }
} else {
Future.failed(failure)
}
}
}
retry(0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment