Last active
December 13, 2016 10:02
-
-
Save claudiobernasconi/cd9460cdd39a13eb96887748a40d87ff to your computer and use it in GitHub Desktop.
This Akka.Net Actor implementation works fine in a non-clustered environment. If I enable clustering using a clustered round-robin-pool router on a single node I start getting dead letters after a short period of time which eventually stops executing jobs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using Akka.Actor; | |
using Akka.DI.Core; | |
using Akka.Routing; | |
using Prototype.Common.Messages; | |
using Prototype.Messages; | |
using Prototype.Shared.Logger; | |
namespace Prototype.Actors | |
{ | |
internal class JobCoordinatorActor : ReceiveActor, IWithUnboundedStash | |
{ | |
private readonly ILogger _logger; | |
private readonly IActorRef _workerActor; | |
private JobState _jobState; | |
private string _jobIdentification; | |
public IStash Stash { get; set; } | |
public JobCoordinatorActor(ILogger logger) | |
{ | |
_logger = logger; | |
_workerActor = Context.ActorOf(Context.DI().Props<WorkerActor>() | |
.WithRouter(new RoundRobinPool(10)), "worker"); | |
Ready(); | |
} | |
private void Running() | |
{ | |
Receive<WorkerCompletedMessage>(message => HandleWorkerCompletedMessage(message)); | |
Receive<StartJobMessage>(message => HandleStartJobMessageWhileRunning()); | |
} | |
private void HandleStartJobMessageWhileRunning() | |
{ | |
Stash.Stash(); | |
} | |
private void Ready() | |
{ | |
Receive<StartJobMessage>(message => HandleStartJobMessage(message)); | |
} | |
private void HandleWorkerCompletedMessage(WorkerCompletedMessage message) | |
{ | |
_jobState.JobDetailStates.First(x => x.PersonId == message.PersonId).Processed = true; | |
if (_jobState.JobDetailStates.All(x => x.Processed)) | |
{ | |
_jobState.Timer.Stop(); | |
_logger.Log("Job {0} completed in {1} ms", message.JobIdentification, _jobState.Timer.ElapsedMilliseconds); | |
Context.ActorSelection("akka.tcp://[email protected]:8091/user/JobMonitor") | |
.Tell(new JobMonitorMessage(message.JobIdentification, "Completed", 0, _jobState.Timer.ElapsedMilliseconds)); | |
_logger.Log("JobMonitor completed message sent!"); | |
Become(Ready); | |
Stash.Unstash(); | |
} | |
} | |
private void HandleStartJobMessage(StartJobMessage message) | |
{ | |
_jobIdentification = message.JobIdentification; | |
var jobDetailStates = message.PersonIds.Select(personId => new JobDetailState(personId)).ToList(); | |
_jobState = new JobState(Stopwatch.StartNew(), jobDetailStates); | |
Context.ActorSelection("akka.tcp://[email protected]:8091/user/JobMonitor") | |
.Tell(new JobMonitorMessage(message.JobIdentification, "Running", message.PersonIds.Count())); | |
_logger.Log("JobMonitor running message sent!"); | |
foreach (var personId in message.PersonIds) | |
{ | |
_workerActor.Tell(new StartWorkerMessage(_jobIdentification, message.Periode, personId)); | |
} | |
Become(Running); | |
} | |
private class JobState | |
{ | |
public JobState(Stopwatch timer, IList<JobDetailState> jobDetailStates) | |
{ | |
Timer = timer; | |
JobDetailStates = jobDetailStates; | |
} | |
public IList<JobDetailState> JobDetailStates { get; set; } | |
public Stopwatch Timer { get; set; } | |
} | |
private class JobDetailState | |
{ | |
public JobDetailState(int personId) | |
{ | |
PersonId = personId; | |
} | |
public int PersonId { get; set; } | |
public bool Processed { get; set; } | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Prototype.Messages | |
{ | |
public class StartJobMessage | |
{ | |
public StartJobMessage(string jobIdentification, int periode, int[] personIds) | |
{ | |
Periode = periode; | |
PersonIds = personIds; | |
JobIdentification = jobIdentification; | |
} | |
public string JobIdentification { get; set; } | |
public int Periode { get; private set; } | |
public int[] PersonIds { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Prototype.Messages | |
{ | |
public class StartWorkerMessage | |
{ | |
public StartWorkerMessage(string jobIdentification, int periode, int personId) | |
{ | |
JobIdentification = jobIdentification; | |
Periode = periode; | |
PersonId = personId; | |
} | |
public string JobIdentification { get; set; } | |
public int Periode { get; private set; } | |
public int PersonId { get; private set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Akka.Actor; | |
using Prototype.Common.Messages; | |
using Prototype.Messages; | |
using Prototype.WorkItems; | |
namespace Prototype.Actors | |
{ | |
internal class WorkerActor : ReceiveActor | |
{ | |
private readonly IWorkItemFactory _workItemFactory; | |
public WorkerActor(IWorkItemFactory workItemFactory) | |
{ | |
_workItemFactory = workItemFactory; | |
Receive<StartWorkerMessage>(message => HandleCalculatePersonSteuerMessage(message)); | |
} | |
private void HandleCalculatePersonSteuerMessage(StartWorkerMessage message) | |
{ | |
var workItem = _workItemFactory.Create<StartWorkerMessage>("Steuerrechnung"); | |
try | |
{ | |
workItem.Process(message); | |
Context.ActorSelection("akka.tcp://[email protected]:8091/user/JobMonitor") | |
.Tell(new JobMonitorProgressMessage(message.JobIdentification, message.PersonId, "Completed", GetActorName())); | |
} | |
catch (Exception ex) | |
{ | |
Context.ActorSelection("akka.tcp://[email protected]:8091/user/JobMonitor") | |
.Tell(new JobMonitorProgressMessage(message.JobIdentification, message.PersonId, "Error", GetActorName(), ex.Message)); | |
} | |
Context.Sender.Tell(new WorkerCompletedMessage(message.JobIdentification, message.PersonId)); | |
} | |
private string GetActorName() | |
{ | |
return Self.Path.ToStringWithAddress().Substring(Self.Path.ToStringWithAddress().IndexOf("/user", StringComparison.Ordinal) + 5); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Prototype.Messages | |
{ | |
public class WorkerCompletedMessage | |
{ | |
public WorkerCompletedMessage(string jobIdentification, int personId) | |
{ | |
PersonId = personId; | |
JobIdentification = jobIdentification; | |
} | |
public string JobIdentification { get; set; } | |
public int PersonId { get; private set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment