Skip to content

Instantly share code, notes, and snippets.

@nachinius
Last active February 21, 2019 14:03
Show Gist options
  • Save nachinius/888bb87ec67fcf1e2d97d81b0612d5e7 to your computer and use it in GitHub Desktop.
Save nachinius/888bb87ec67fcf1e2d97d81b0612d5e7 to your computer and use it in GitHub Desktop.
Scala Futures: a) serialized (one after the other, and stop on failures), b) with timeout, c) retry with delay
import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}
import play.api.Logger
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Random, Success}
object FutureUtils {
/**
* https://www.michaelpollmeier.com/execute-scala-futures-in-serial-one-after-the-other-non-blocking
* Stop if one failes.
* One after the other (not in parallel)
*/
def serialize[A, B, C[A] <: Iterable[A]]
(collection: C[A])(fn: A => Future[B])(
implicit ec: ExecutionContext,
cbf: CanBuildFrom[C[B], B, C[B]]): Future[C[B]] = {
val builder = cbf()
builder.sizeHint(collection.size)
collection.foldLeft(Future(builder)) {
(previousFuture, next) =>
for {
previousResults <- previousFuture
next <- fn(next)
} yield previousResults += next
} map { builder ⇒ builder.result }
}
//http://justinhj.github.io/2017/07/16/future-with-timeout.html
//https://stackoverflow.com/a/45272591/159291
// All Future's that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter ensures that timeout timers are daemon threads and do not stop
// the program from shutting down
val timer: Timer = new Timer(true)
/**
* http://justinhj.github.io/2017/07/16/future-with-timeout.html
* https://stackoverflow.com/a/45272591/159291
*
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
*
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future's outcome
* @return Future[T]
*/
def futureWithTimeout[T](future: Future[T], timeout: FiniteDuration, report: () => String)(implicit ec: ExecutionContext): Future[T] = {
// Promise will be fulfilled with either the callers Future or the timer task if it times out
val p = Promise[T]
// and a Timer task to handle timing out
val timerTask = new TimerTask() {
def run(): Unit = {
p.tryFailure(new TimeoutException(report()))
}
}
// Set the timeout to check in the future
timer.schedule(timerTask, timeout.toMillis)
future.map {
a =>
if (p.trySuccess(a)) {
timerTask.cancel()
}
}
.recover {
case e: Exception =>
if (p.tryFailure(e)) {
timerTask.cancel()
}
}
p.future
}
object RetryDelays {
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import akka.pattern.after
import akka.actor.Scheduler
// based on https://gist.github.com/viktorklang/9414163
def retry[T](
f: => Future[T],
delay: Seq[FiniteDuration],
retries: Int,
defaultDelay: FiniteDuration,
onDelay: Throwable => Unit)(implicit ec: ExecutionContext, s: Scheduler): Future[T] = {
f recoverWith {
case ex if retries > 0 =>
onDelay(ex)
val nextDelay = if(delay.isEmpty) delay else delay.tail
after(delay.headOption.getOrElse(defaultDelay), s)(retry(f, nextDelay, retries - 1, defaultDelay, onDelay))
}
}
object Delays {
def withDefault(delays: List[FiniteDuration], retries: Int, default: FiniteDuration) = {
if (delays.length > retries) {
delays.take(retries)
}
else {
delays ++ List.fill(retries - delays.length)(default)
}
}
def withJitter(delays: Seq[FiniteDuration], maxJitter: Double, minJitter: Double) =
delays.map(_ * (minJitter + (maxJitter - minJitter) * Random.nextDouble))
val fibonacci: Stream[FiniteDuration] = 0.milliseconds #:: 200.milliseconds #:: (fibonacci zip fibonacci.tail).map { t => t._1 + t._2 }
}
}
}
@nachinius
Copy link
Author

nachinius commented Feb 21, 2019

`
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration._

class FutureUtilsTest extends BaseSpec {

"A retry future" must {
"be reattempted as many times as requested" in {

  val a = ArrayBuffer[Int]()

  val fut = FutureUtils.RetryDelays.retry(
    {
      a.append(a.length)
      Future.failed(new Exception("whatever"))
    },
    List.fill(10)(2.milliseconds),
    5,
    1.milliseconds,
    (ex) => println(s"in purpose msg for delaying ${ex}" + a.mkString(","))
  )(actorSystem.dispatcher, actorSystem.scheduler)


  fut.failed.futureValue
  a must have length 5 + 1
}

"return the first success result" in {

  val a = ArrayBuffer[Int](0,0,0,1,0,0)
  val fut = FutureUtils.RetryDelays.retry(
    {
      val element = a.remove(0)
      element match {
        case 0 => Future.failed(new Exception("keep trying"))
        case 1 => Future.successful(1234)
      }
    },
    List.fill(10)(2.milliseconds),
    5,
    1.milliseconds,
    (ex) => println(s"delaying in purpose ${ex}")
  )(actorSystem.dispatcher, actorSystem.scheduler)

  fut.futureValue mustBe 1234
}

}

}`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment