Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active November 17, 2016 14:55
Show Gist options
  • Save atamborrino/018e04c1277c722c5e3fe2efa3767e79 to your computer and use it in GitHub Desktop.
Save atamborrino/018e04c1277c722c5e3fe2efa3767e79 to your computer and use it in GitHub Desktop.
Helpers for Scala's Futures
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import akka.actor.Scheduler
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
object FutureHelper {
def withTimeout[A](delay: FiniteDuration)
(fut: Future[A])
(implicit ec: ExecutionContext, scheduler: Scheduler): Future[A] = {
val delayed = akka.pattern.after(delay, scheduler) {
Future.failed(ProcessingTimeoutException(s"Timeout after $delay"))
}
Future.firstCompletedOf(Seq(fut, delayed))
}
def retryWithBackoff[A](retryMinInterval: FiniteDuration,
retryMaxInterval: FiniteDuration,
keepTryingAtMaxInterval: Boolean,
logContext: String = "")
(fut: => Future[A])
(implicit scheduler: Scheduler, ec: ExecutionContext, log: Logger): Future[A] = {
def computeBackoff(currentNbRetries: Int): FiniteDuration =
(Math.pow(2, currentNbRetries) * retryMinInterval).asInstanceOf[FiniteDuration]
def retry(nbRetries: Int): Future[A] = {
fut.recoverWith {
case NonFatal(failure) =>
val newBackoff = computeBackoff(nbRetries)
if (newBackoff <= retryMaxInterval) {
log.warn(s"Failure with context: $logContext. Will retry with backoff $newBackoff", failure)
akka.pattern.after(newBackoff, scheduler) { retry(nbRetries + 1) }
} else if (keepTryingAtMaxInterval) {
log.error(s"Failure with context: $logContext. Will retry with backoff $retryMaxInterval", failure)
akka.pattern.after(retryMaxInterval, scheduler) { retry(nbRetries) }
} else {
Future.failed(failure)
}
}
}
retry(0)
}
def firstSuccessfulOf[A](futures: Seq[Future[A]])(implicit ec: ExecutionContext): Future[A] = {
val total = futures.length
val counter = new AtomicInteger(total)
val p = Promise[A]()
futures.foreach { fut =>
fut.onComplete {
case Success(a) => p.trySuccess(a)
case Failure(failure) =>
if (counter.decrementAndGet() == 0) p.failure(failure)
}
}
p.future
}
def traverseSequential[A, B](as: Seq[A])(f: A => Future[B])
(implicit ec: ExecutionContext): Future[Seq[B]] = {
as.foldLeft(Future.successful(Vector.empty[B])) { (futAcc, a) =>
for {
acc <- futAcc
b <- f(a)
} yield acc :+ b
}
}
// Traverse with a max parallelism
def traverse[A, B](as: Seq[A], parallelism: Int)(f: A => Future[B])
(implicit ec: ExecutionContext, mat: ActorMaterializer): Future[Seq[B]] = {
Source(as.toList)
.mapAsync(parallelism)(f)
.runWith(Sink.seq)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment