Skip to content

Instantly share code, notes, and snippets.

@jasongoodwin
Last active March 16, 2017 08:15
Show Gist options
  • Save jasongoodwin/6276969 to your computer and use it in GitHub Desktop.
Save jasongoodwin/6276969 to your computer and use it in GitHub Desktop.
Async Akka timeout/retry
package io.pressur
import akka.actor.{ActorRef, Actor}
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Future
import akka.pattern.after
import akka.actor.Status._
case class AskWithTimeout[T](actor: ActorRef, message: T)
case class AskWithTimeoutRetry(ask: AskWithTimeout, sender: ActorRef, attempt: Int = 0)
case class AskWithTimeoutException[T](askWithRetry: AskWithTimeoutRetry) extends Exception
class MaxRetryException(message: String) extends Exception(message)
class Worker(maxTries: Int) extends Actor {
def receive: Actor.Receive = {
case ask: AskWithTimeout =>
self ! AskWithTimeoutRetry(ask, sender)
case ask: AskWithTimeoutRetry =>
val delayed = after(1000 millis, using =
context.system.scheduler)(Future.failed(AskWithTimeoutException(AskWithTimeoutRetry(ask.ask, ask.sender, ask.attempt + 1))))
val future = ask.ask.actor ? ask.ask.message
val result = Future.firstCompletedOf(Seq(future, delayed))
result onComplete {
case s: Success => ask.sender ! s
case Failure(t: AskWithTimeoutException) =>
if (t.askWithRetry.attempt >= maxTries)
t.askWithRetry.sender ! Failure(new MaxRetryException("Too many retries on message: " + t.askWithRetry.ask.message))
else
self ! t.askWithRetry
case f: Failure => ask.sender ! f
}
}
}
@viktorklang
Copy link

Just as a sidenote: You'll want to avoid putting message class definitions inside the actor class as they will get outer-pointers that may leak the "this" instance of the Actor.

Cheers,

@jasongoodwin
Copy link
Author

Oh thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment