Last active
November 17, 2016 14:55
-
-
Save atamborrino/018e04c1277c722c5e3fe2efa3767e79 to your computer and use it in GitHub Desktop.
Helpers for Scala's Futures
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Success} | |
import scala.util.control.NonFatal | |
import akka.actor.Scheduler | |
import java.util.concurrent.atomic.AtomicInteger | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl._ | |
object FutureHelper { | |
def withTimeout[A](delay: FiniteDuration) | |
(fut: Future[A]) | |
(implicit ec: ExecutionContext, scheduler: Scheduler): Future[A] = { | |
val delayed = akka.pattern.after(delay, scheduler) { | |
Future.failed(ProcessingTimeoutException(s"Timeout after $delay")) | |
} | |
Future.firstCompletedOf(Seq(fut, delayed)) | |
} | |
def retryWithBackoff[A](retryMinInterval: FiniteDuration, | |
retryMaxInterval: FiniteDuration, | |
keepTryingAtMaxInterval: Boolean, | |
logContext: String = "") | |
(fut: => Future[A]) | |
(implicit scheduler: Scheduler, ec: ExecutionContext, log: Logger): Future[A] = { | |
def computeBackoff(currentNbRetries: Int): FiniteDuration = | |
(Math.pow(2, currentNbRetries) * retryMinInterval).asInstanceOf[FiniteDuration] | |
def retry(nbRetries: Int): Future[A] = { | |
fut.recoverWith { | |
case NonFatal(failure) => | |
val newBackoff = computeBackoff(nbRetries) | |
if (newBackoff <= retryMaxInterval) { | |
log.warn(s"Failure with context: $logContext. Will retry with backoff $newBackoff", failure) | |
akka.pattern.after(newBackoff, scheduler) { retry(nbRetries + 1) } | |
} else if (keepTryingAtMaxInterval) { | |
log.error(s"Failure with context: $logContext. Will retry with backoff $retryMaxInterval", failure) | |
akka.pattern.after(retryMaxInterval, scheduler) { retry(nbRetries) } | |
} else { | |
Future.failed(failure) | |
} | |
} | |
} | |
retry(0) | |
} | |
def firstSuccessfulOf[A](futures: Seq[Future[A]])(implicit ec: ExecutionContext): Future[A] = { | |
val total = futures.length | |
val counter = new AtomicInteger(total) | |
val p = Promise[A]() | |
futures.foreach { fut => | |
fut.onComplete { | |
case Success(a) => p.trySuccess(a) | |
case Failure(failure) => | |
if (counter.decrementAndGet() == 0) p.failure(failure) | |
} | |
} | |
p.future | |
} | |
def traverseSequential[A, B](as: Seq[A])(f: A => Future[B]) | |
(implicit ec: ExecutionContext): Future[Seq[B]] = { | |
as.foldLeft(Future.successful(Vector.empty[B])) { (futAcc, a) => | |
for { | |
acc <- futAcc | |
b <- f(a) | |
} yield acc :+ b | |
} | |
} | |
// Traverse with a max parallelism | |
def traverse[A, B](as: Seq[A], parallelism: Int)(f: A => Future[B]) | |
(implicit ec: ExecutionContext, mat: ActorMaterializer): Future[Seq[B]] = { | |
Source(as.toList) | |
.mapAsync(parallelism)(f) | |
.runWith(Sink.seq) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment