Last active
November 16, 2016 15:18
-
-
Save PowerMogli/10e5c92933dfeb7f3d4c3af738bf72ad to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
NaipActorSystem = ActorSystem.Create("ScmActorSystem", AkkaSettings.GetConfiguration()); | |
NaipActorSystem.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "webRouter"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class OrderPersistentActor : AtLeastOnceDeliveryReceiveActor | |
{ | |
#region Fields | |
private readonly ActorPath _actorPath = ActorPath.Parse(ActorPaths.WebRoutingActor.Path); | |
private ICancelable _cancelable; | |
#endregion | |
#region Construction | |
/// <summary> | |
/// Initializes a new instance of the <see cref="OrderPersistentActor"/> class. | |
/// </summary> | |
public OrderPersistentActor() | |
{ | |
Connected(); | |
Recover<SnapshotOffer>(offer => SetDeliverySnapshot((global::Akka.Persistence.AtLeastOnceDeliverySnapshot)offer.Snapshot)); | |
} | |
#endregion | |
#region Properties | |
/// <summary> | |
/// Id of the persistent entity for which messages should be replayed. | |
/// </summary> | |
public override string PersistenceId => Context.Self.Path.Name; | |
#endregion | |
#region Private Methods | |
private void Connected() | |
{ | |
IActorRef self = Self; | |
Context.System.ActorSelection(_actorPath) | |
.Ask<ActorIdentity>(new Identify(null)) | |
.PipeTo(self); | |
ReadUnconfirmed(); | |
Command<ChangePrescriptionStatusToIssued>(msg => | |
{ | |
Deliver( | |
_actorPath, | |
messageId => new ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>(new ChangePrescriptionStatusToIssued(msg.OrderId, msg.UserName), messageId)); | |
SaveSnapshot(); | |
}); | |
Command<ChangePrescriptionStatusToNotIssued>(msg => | |
{ | |
Deliver( | |
_actorPath, | |
messageId => new ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>(new ChangePrescriptionStatusToNotIssued(msg.OrderId, msg.UserName, msg.Comment), messageId)); | |
SaveSnapshot(); | |
}); | |
Command<SaveSnapshotSuccess>(snapshotSaved => | |
{ | |
long snapshotSeqNr = snapshotSaved.Metadata.SequenceNr; | |
DeleteSnapshots(new SnapshotSelectionCriteria(snapshotSeqNr - 1)); | |
}); | |
Command<UnconfirmedWarning>(msg => | |
{ | |
foreach (UnconfirmedDelivery unconfirmedDelivery in msg.UnconfirmedDeliveries) | |
{ | |
ConfirmDelivery(unconfirmedDelivery.DeliveryId); | |
} | |
}); | |
Command<ActorIdentity>(actorIdentity => | |
{ | |
if (actorIdentity?.Subject == null) | |
{ | |
Log.Error($"Remote actor '{ActorPaths.WebRoutingActor.Name}' could not be located on scm2.0-server."); | |
Become(Disconnected); | |
return; | |
} | |
Context.Watch(actorIdentity.Subject); | |
}); | |
Command<Terminated>(msg => Become(Disconnected)); | |
Command<ReliableDeliveryAck>(ack => | |
{ | |
ConfirmDelivery(ack.MessageId); | |
DeleteSnapshots(SnapshotSelectionCriteria.Latest); | |
SaveSnapshot(); | |
}); | |
} | |
private void Disconnected() | |
{ | |
ReadUnconfirmed(); | |
Command<UnconfirmedWarning>(msg => | |
{ | |
foreach (UnconfirmedDelivery unconfirmedDelivery in msg.UnconfirmedDeliveries) | |
{ | |
ConfirmDelivery(unconfirmedDelivery.DeliveryId); | |
} | |
}); | |
Command<ChangePrescriptionStatusToIssued>(msg => Stash.Stash()); | |
Command<ChangePrescriptionStatusToNotIssued>(msg => Stash.Stash()); | |
Command<Reconnect>(msg => | |
{ | |
ActorIdentity actorIdentity = Context.System.ActorSelection(_actorPath) | |
.Ask<ActorIdentity>(new Identify(null)) | |
.Result; | |
if (actorIdentity?.Subject == null) | |
{ | |
return; | |
} | |
_cancelable.Cancel(); | |
Become(Connected); | |
Stash.UnstashAll(); | |
}); | |
_cancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(5), TimeSpan.FromMinutes(10), Self, new Reconnect(), Self); | |
} | |
private void ReadUnconfirmed() | |
{ | |
Command<ReadUnconfirmedOrderMessages>(msg => | |
{ | |
global::Akka.Persistence.AtLeastOnceDeliverySnapshot deliverySnapshot = GetDeliverySnapshot(); | |
IEnumerable<ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>> issuedMessages = | |
deliverySnapshot.UnconfirmedDeliveries.Select(x => x.Message) | |
.OfType<ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>>(); | |
IEnumerable<ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>> notIssuedMessages = | |
deliverySnapshot.UnconfirmedDeliveries.Select(x => x.Message) | |
.OfType<ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>>(); | |
List<IChangePrescriptionStatusMessage> messages = new List<IChangePrescriptionStatusMessage>(); | |
messages.AddRange(issuedMessages.Select(x => x.Message)); | |
messages.AddRange(notIssuedMessages.Select(x => x.Message)); | |
Sender.Tell(new UnconfirmedOrderMessages(messages), Self); | |
}); | |
} | |
private void SaveSnapshot() | |
{ | |
global::Akka.Persistence.AtLeastOnceDeliverySnapshot snapshot = GetDeliverySnapshot(); | |
SaveSnapshot(snapshot); | |
} | |
#endregion | |
private class Reconnect | |
{ | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class OrderActor : ReceiveActor | |
{ | |
private static OrderHandler _orderHandler; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="OrderActor" /> class. | |
/// </summary> | |
public OrderActor(INaipLog log, OrderHandler orderHandler) | |
{ | |
_orderHandler = orderHandler; | |
Receive<ReliableDeliveryEnvelope<ChangePrescriptionStatusToIssued>>(msg => | |
{ | |
log.Info(LogPriority.Normal, $"OrderActor received ChangePrescriptionStatusToIssued message for order id {msg.Message.OrderId}"); | |
try | |
{ | |
SaveWebOrderPrescriptionStatusIssued(msg.Message); | |
Sender.Tell(new ReliableDeliveryAck(msg.MessageId)); | |
} | |
catch (System.Exception exception) | |
{ | |
log.Exception(exception); | |
} | |
}); | |
Receive<ReliableDeliveryEnvelope<ChangePrescriptionStatusToNotIssued>>(msg => | |
{ | |
log.Info(LogPriority.Normal, $"OrderActor received ChangePrescriptionStatusToNotIssued message for order id {msg.Message.OrderId}"); | |
try | |
{ | |
SaveWebOrderPrescriptionStatusNotIssued(msg.Message); | |
Sender.Tell(new ReliableDeliveryAck(msg.MessageId)); | |
} | |
catch (System.Exception exception) | |
{ | |
log.Exception(exception); | |
} | |
}); | |
} | |
private static void SaveWebOrderPrescriptionStatusIssued(ChangePrescriptionStatusToIssued message) | |
{ | |
_orderHandler.SetPrescriptionStatusIssued(message.OrderId); | |
} | |
private static void SaveWebOrderPrescriptionStatusNotIssued(ChangePrescriptionStatusToNotIssued message) | |
{ | |
_orderHandler.SetPrescriptionStatusNotIssued(message.OrderId, message.UserName, message.Comment); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Program.NaipActorSystem.ActorOf(Props.Create(() => new OrderActor(Log.Akka, this)), "orderActor"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment