Last active
June 6, 2018 14:28
-
-
Save atamborrino/aa545a23480b0a8e5a80b8c25d87ac06 to your computer and use it in GitHub Desktop.
Speculative retries for Scala's Futures, inspired from Cassandra Java driver http://docs.datastax.com/en/developer/java-driver/3.1/manual/speculative_execution/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.Scheduler | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.concurrent.duration.FiniteDuration | |
object FutureHelper { | |
private val log = ScalaLogger.get(this.getClass) | |
/** | |
* Retry concurrently the async execution ''f'' every ''delay'' while no execution completes. | |
* Number of retries is bounded by ''maxSpeculativeExecutions''. | |
* The resulting future will be the result of the first async execution ''f'' to complete. | |
* | |
* This retry strategy allows to be aggressive (retry fast) on a unusually slow first execution | |
* while keeping the result of the first execution if it still completes first despite of concurrent retries. | |
* Thus this retry strategy tries to lower latency at the cost of potentially increasing throughput. | |
* Note that the ''f'' async execution should be idempotent if you want to have a deterministic result despite retries. | |
* | |
* As an example, if the ''f'' async execution is a call to a service, it makes sense to put ''delay'' | |
* at the observed p95 latency of the service, and to make sure that the retry will call a different instance | |
* of the service. | |
* | |
* Inspiration from http://docs.datastax.com/en/developer/java-driver/3.1/manual/speculative_execution/ | |
* (speculative retries in the Java Cassandra driver) | |
*/ | |
def withSpeculativeRetries[A](delay: FiniteDuration, maxSpeculativeExecutions: Int) | |
(f: => Future[A]) | |
(implicit ec: ExecutionContext, scheduler: Scheduler): Future[A] = { | |
val p = Promise[A] | |
val executionsTimeouts = (1 to maxSpeculativeExecutions).map(_ * delay) | |
p.tryCompleteWith(f) | |
val cancellables = executionsTimeouts.map { executionTimeout => | |
scheduler.scheduleOnce(executionTimeout) { | |
log.warn(s"Launching speculative retry after $executionTimeout") | |
p.tryCompleteWith(f) | |
} | |
} | |
p.future.onComplete { _ => cancellables.foreach(_.cancel()) } | |
p.future | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment