Last active
August 29, 2015 14:05
-
-
Save karthik20522/07e953d88013abc45bdf to your computer and use it in GitHub Desktop.
AKKA.NET RetryActor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Event; | |
namespace SampleActor.Actors | |
{ | |
public class RetryActor : ReceiveActor, ILogReceive | |
{ | |
LoggingAdapter log = Logging.GetLogger(Context); | |
//make this immutable | |
private readonly Props _dangerousActorProp; | |
private object _originalMessage; | |
private ActorRef _dangerousActor, _originalSender; | |
int _retryCount, _maxAttempts, _backoffInterval = 1; | |
/// <summary> | |
/// Retry Actor that encapsulates a dangerous actor | |
/// </summary> | |
/// <param name="actor">The dangerous actor props</param> | |
/// <param name="maxAttemps">Number of attempts before esclating to the parent</param> | |
/// <param name="backOffInterval">Pause before retries in seconds</param> | |
public RetryActor(Props dangerActorProp, int maxAttempts = 5, int backOffInterval = 2) | |
{ | |
this._maxAttempts = maxAttempts; | |
this._backoffInterval = backOffInterval; | |
this._dangerousActorProp = dangerActorProp; | |
Receive<Terminated>(terminatedActor => { | |
if (terminatedActor.ActorRef.Path.Name == _dangerousActor.Path.Name) | |
{ //make sure they are the same actor | |
_dangerousActor = null; | |
log.Info("Dangerous actor terminated: " + terminatedActor.ActorRef.Path.Name); | |
Context.SetReceiveTimeout(TimeSpan.FromSeconds(_retryCount * _backoffInterval)); | |
} | |
}); | |
Receive<ReceiveTimeout>(actorTimeout => { | |
Context.SetReceiveTimeout(TimeSpan.MaxValue); //reset the timeout | |
log.Info("Restarting: " + this._retryCount); | |
DoWork(); //restart the process | |
}); | |
ReceiveAny(msg => { | |
this._originalSender = Sender; | |
this._originalMessage = msg; | |
log.Info("Received Original message: " + msg); | |
DoWork(); //do work - call dangerous actor | |
}); | |
} | |
private void DoWork() | |
{ | |
//appending datetime ticks to actor name can help in avoiding actor name collision | |
_dangerousActor = Context.ActorOf(_dangerousActorProp, string.Format("dangerActor_{0}_{1}", DateTime.Now.Ticks, this._retryCount)); | |
Context.Watch(_dangerousActor); //watch the actor for terminated message | |
_dangerousActor.Tell(_originalMessage, _originalSender); | |
} | |
/// <summary> | |
/// Supervisor strategy when child actor throws an exception | |
/// </summary> | |
/// <returns></returns> | |
protected override SupervisorStrategy SupervisorStrategy() | |
{ | |
return new OneForOneStrategy( | |
decider: x => { | |
log.Info("Exception: " + x.Message); | |
if (this._retryCount > this._maxAttempts) { | |
return Directive.Escalate; //escalate the exception if retry attemps exceeds | |
} | |
else { | |
this._retryCount += 1; | |
return Directive.Stop; //terminate the failing actor | |
} | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment