Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active June 6, 2018 14:28
Show Gist options
  • Save atamborrino/aa545a23480b0a8e5a80b8c25d87ac06 to your computer and use it in GitHub Desktop.
Save atamborrino/aa545a23480b0a8e5a80b8c25d87ac06 to your computer and use it in GitHub Desktop.
Speculative retries for Scala's Futures, inspired from Cassandra Java driver http://docs.datastax.com/en/developer/java-driver/3.1/manual/speculative_execution/
import akka.actor.Scheduler
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.FiniteDuration
object FutureHelper {
private val log = ScalaLogger.get(this.getClass)
/**
* Retry concurrently the async execution ''f'' every ''delay'' while no execution completes.
* Number of retries is bounded by ''maxSpeculativeExecutions''.
* The resulting future will be the result of the first async execution ''f'' to complete.
*
* This retry strategy allows to be aggressive (retry fast) on a unusually slow first execution
* while keeping the result of the first execution if it still completes first despite of concurrent retries.
* Thus this retry strategy tries to lower latency at the cost of potentially increasing throughput.
* Note that the ''f'' async execution should be idempotent if you want to have a deterministic result despite retries.
*
* As an example, if the ''f'' async execution is a call to a service, it makes sense to put ''delay''
* at the observed p95 latency of the service, and to make sure that the retry will call a different instance
* of the service.
*
* Inspiration from http://docs.datastax.com/en/developer/java-driver/3.1/manual/speculative_execution/
* (speculative retries in the Java Cassandra driver)
*/
def withSpeculativeRetries[A](delay: FiniteDuration, maxSpeculativeExecutions: Int)
(f: => Future[A])
(implicit ec: ExecutionContext, scheduler: Scheduler): Future[A] = {
val p = Promise[A]
val executionsTimeouts = (1 to maxSpeculativeExecutions).map(_ * delay)
p.tryCompleteWith(f)
val cancellables = executionsTimeouts.map { executionTimeout =>
scheduler.scheduleOnce(executionTimeout) {
log.warn(s"Launching speculative retry after $executionTimeout")
p.tryCompleteWith(f)
}
}
p.future.onComplete { _ => cancellables.foreach(_.cancel()) }
p.future
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment