Last active
February 21, 2019 14:03
-
-
Save nachinius/888bb87ec67fcf1e2d97d81b0612d5e7 to your computer and use it in GitHub Desktop.
Scala Futures: a) serialized (one after the other, and stop on failures), b) with timeout, c) retry with delay
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeoutException | |
import java.util.{Timer, TimerTask} | |
import play.api.Logger | |
import scala.collection.generic.CanBuildFrom | |
import scala.concurrent.duration.FiniteDuration | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.util.{Failure, Random, Success} | |
object FutureUtils { | |
/** | |
* https://www.michaelpollmeier.com/execute-scala-futures-in-serial-one-after-the-other-non-blocking | |
* Stop if one failes. | |
* One after the other (not in parallel) | |
*/ | |
def serialize[A, B, C[A] <: Iterable[A]] | |
(collection: C[A])(fn: A => Future[B])( | |
implicit ec: ExecutionContext, | |
cbf: CanBuildFrom[C[B], B, C[B]]): Future[C[B]] = { | |
val builder = cbf() | |
builder.sizeHint(collection.size) | |
collection.foldLeft(Future(builder)) { | |
(previousFuture, next) => | |
for { | |
previousResults <- previousFuture | |
next <- fn(next) | |
} yield previousResults += next | |
} map { builder ⇒ builder.result } | |
} | |
//http://justinhj.github.io/2017/07/16/future-with-timeout.html | |
//https://stackoverflow.com/a/45272591/159291 | |
// All Future's that use futureWithTimeout will use the same Timer object | |
// it is thread safe and scales to thousands of active timers | |
// The true parameter ensures that timeout timers are daemon threads and do not stop | |
// the program from shutting down | |
val timer: Timer = new Timer(true) | |
/** | |
* http://justinhj.github.io/2017/07/16/future-with-timeout.html | |
* https://stackoverflow.com/a/45272591/159291 | |
* | |
* Returns the result of the provided future within the given time or a timeout exception, whichever is first | |
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a | |
* Thread.sleep would | |
* | |
* @param future Caller passes a future to execute | |
* @param timeout Time before we return a Timeout exception instead of future's outcome | |
* @return Future[T] | |
*/ | |
def futureWithTimeout[T](future: Future[T], timeout: FiniteDuration, report: () => String)(implicit ec: ExecutionContext): Future[T] = { | |
// Promise will be fulfilled with either the callers Future or the timer task if it times out | |
val p = Promise[T] | |
// and a Timer task to handle timing out | |
val timerTask = new TimerTask() { | |
def run(): Unit = { | |
p.tryFailure(new TimeoutException(report())) | |
} | |
} | |
// Set the timeout to check in the future | |
timer.schedule(timerTask, timeout.toMillis) | |
future.map { | |
a => | |
if (p.trySuccess(a)) { | |
timerTask.cancel() | |
} | |
} | |
.recover { | |
case e: Exception => | |
if (p.tryFailure(e)) { | |
timerTask.cancel() | |
} | |
} | |
p.future | |
} | |
object RetryDelays { | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.Future | |
import akka.pattern.after | |
import akka.actor.Scheduler | |
// based on https://gist.github.com/viktorklang/9414163 | |
def retry[T]( | |
f: => Future[T], | |
delay: Seq[FiniteDuration], | |
retries: Int, | |
defaultDelay: FiniteDuration, | |
onDelay: Throwable => Unit)(implicit ec: ExecutionContext, s: Scheduler): Future[T] = { | |
f recoverWith { | |
case ex if retries > 0 => | |
onDelay(ex) | |
val nextDelay = if(delay.isEmpty) delay else delay.tail | |
after(delay.headOption.getOrElse(defaultDelay), s)(retry(f, nextDelay, retries - 1, defaultDelay, onDelay)) | |
} | |
} | |
object Delays { | |
def withDefault(delays: List[FiniteDuration], retries: Int, default: FiniteDuration) = { | |
if (delays.length > retries) { | |
delays.take(retries) | |
} | |
else { | |
delays ++ List.fill(retries - delays.length)(default) | |
} | |
} | |
def withJitter(delays: Seq[FiniteDuration], maxJitter: Double, minJitter: Double) = | |
delays.map(_ * (minJitter + (maxJitter - minJitter) * Random.nextDouble)) | |
val fibonacci: Stream[FiniteDuration] = 0.milliseconds #:: 200.milliseconds #:: (fibonacci zip fibonacci.tail).map { t => t._1 + t._2 } | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
`
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration._
class FutureUtilsTest extends BaseSpec {
"A retry future" must {
"be reattempted as many times as requested" in {
}
}`