Last active
June 18, 2020 09:39
-
-
Save codetinkerhack/8206481 to your computer and use it in GitHub Desktop.
Retry Akka actor /Ask pattern with individual timeout, retry intervals
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.codetinkerhack | |
import akka.actor.{ ActorRef, Props, Actor, ActorLogging } | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
import akka.actor.Actor.Receive | |
import akka.pattern.pipe | |
import scala.util.Success | |
import scala.util.Failure | |
object ReTry { | |
private case class Retry(originalSender: ActorRef, message: Any, times: Int) | |
private case class Response(originalSender: ActorRef, result: Any) | |
def props(tries: Int, retryTimeOut: FiniteDuration, retryInterval: FiniteDuration, forwardTo: ActorRef): Props = Props(new ReTry(tries: Int, retryTimeOut: FiniteDuration, retryInterval: FiniteDuration, forwardTo: ActorRef)) | |
} | |
class ReTry(val tries: Int, retryTimeOut: FiniteDuration, retryInterval: FiniteDuration, forwardTo: ActorRef) extends Actor with ActorLogging { | |
import context.dispatcher | |
import ReTry._ | |
// Retry loop that keep on Re-trying the request | |
def retryLoop: Receive = { | |
// Response from future either Success or Failure is a Success - we propagate it back to a original sender | |
case Response(originalSender, result) => | |
originalSender ! result | |
context stop self | |
case Retry(originalSender, message, triesLeft) => | |
// Process (Re)try here. When future completes it sends result to self | |
(forwardTo ? message) (retryTimeOut) onComplete { | |
case Success(result) => | |
self ! Response(originalSender, result) // sending responses via self synchronises results from futures that may come potentially in any order. It also helps the case when the actor is stopped (in this case responses will become deadletters) | |
case Failure(ex) => | |
if (triesLeft - 1 == 0) {// In case of last try and we got a failure (timeout) lets send Retries exceeded error | |
self ! Response(originalSender, Failure(new Exception("Retries exceeded"))) | |
} | |
else | |
log.error("Error occurred: " + ex) | |
} | |
// Send one more retry after interval | |
if (triesLeft - 1 > 0) | |
context.system.scheduler.scheduleOnce(retryInterval, self, Retry(originalSender, message, triesLeft - 1)) | |
case m @ _ => | |
log.error("No handling defined for message: " + m) | |
} | |
// Initial receive loop | |
def receive: Receive = { | |
case message @ _ => | |
self ! Retry(sender, message, tries) | |
// Lets swap to a retry loop here. | |
context.become(retryLoop, false) | |
} | |
} |
Line 51-52 should be in Failure
case, else branch, and not directly after handling of ask pattern, because we want to retry only after we are sure that forwardTo ? message
has failed before retrying.
Ima steal and modify to save some time. Thanks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What is the equivalent java code for this.