Skip to content

Instantly share code, notes, and snippets.

@Astrac
Created December 28, 2013 13:27
Show Gist options
  • Save Astrac/8159507 to your computer and use it in GitHub Desktop.
Save Astrac/8159507 to your computer and use it in GitHub Desktop.
Akka utility to retry asking a message multiple times returning a Future for the whole tries
package kvstore
import akka.actor._
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.duration._
object RetryingActor {
case class Start[T](target: ActorRef, message: T, rate: FiniteDuration, maxAttempts: Int)
case object Retry
case class RetryException(attempts: Int) extends Exception(s"Cannot retry after $attempts attempts")
def props[T] = Props[RetryingActor]
}
class RetryingActor extends Actor
{
import RetryingActor._
var attempts = 0
def retrying[T](requester: ActorRef, subscription: Cancellable, target: ActorRef, message: T, rate: FiniteDuration, maxAttempts: Int): Receive = {
case Retry =>
if (attempts < maxAttempts) target ! message
else {
requester ! Status.Failure(RetryException(attempts))
subscription.cancel()
context.stop(self)
}
case response =>
requester ! response
subscription.cancel()
context.stop(self)
}
def receive: Receive = {
case Start(target, message, rate, maxAttempts) =>
val subscription = context.system.scheduler.schedule(0.second, rate, self, Retry)(context.system.dispatcher)
context.become(retrying(sender, subscription, target, message, rate, maxAttempts))
}
}
object RetryPattern {
def retry[T](actor: ActorRef, msg: T, maxAttempts: Int, rate: FiniteDuration)(implicit context: ActorContext) = {
implicit val to = Timeout.durationToTimeout((rate * maxAttempts) + 1.millis)
context.actorOf(RetryingActor.props) ? RetryingActor.Start(actor, msg, rate, maxAttempts)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment