Last active
April 17, 2022 10:27
-
-
Save Horusiath/64d829720b66df16bc59dbf106a8008e to your computer and use it in GitHub Desktop.
Generic persistent gateway for Akka.NET at-least-once-delivery semantics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Delivery mechanism looks like this - if sender wants to reliably deliver payload to recipient | |
// using at-least-once delivery semantics, it sends that payload wrapped to Messenger actor, which | |
// is responsible for the persistence and redelivery: | |
// | |
// +--------+ +-----------+ +-----------+ | |
// | |--(DeliverOrder<T>)-->| |--(Delivery<T>:1)-->| | | |
// | | | | /* 2nd attempt */ | | | |
// | Sender | | Messenger |--(Delivery<T>:2)-->| Recipient | | |
// | | | | | | | |
// | | | |<----(Confirm:2)----| | | |
// +--------+ +-----------+ +-----------+ | |
public interface IMessengerCommand { } | |
// send from payload sender to at-least-once delivery actor | |
public sealed class DeliverOrder<T> : IMessengerCommand | |
{ | |
public readonly IActorRef Recipient; | |
public readonly T Payload; | |
public DeliverOrder(IActorRef recipient, T payload) | |
{ | |
Recipient = recipient; | |
Payload = payload; | |
} | |
} | |
// send from payload recipient to at-least-once delivery actor | |
public sealed class Confirm : IMessengerCommand | |
{ | |
public readonly long DeliveryId; | |
public Confirm(long deliveryId) | |
{ | |
DeliveryId = deliveryId; | |
} | |
} | |
// send from at-least-once delivery actor to payload recipient | |
public sealed class Delivery<T> | |
{ | |
public readonly T Payload; | |
public readonly long DeliveryId; | |
public Delivery(T payload, long deliveryId) | |
{ | |
Payload = payload; | |
DeliveryId = deliveryId; | |
} | |
} | |
public interface IMessengerEvent { } | |
public sealed class MessageSent<T> : IMessengerEvent | |
{ | |
public readonly IActorRef Recipient; | |
public readonly T Payload; | |
public MessageSent(IActorRef recipient, T payload) | |
{ | |
Payload = payload; | |
Recipient = recipient; | |
} | |
} | |
public sealed class MessageConfirmed : IMessengerEvent | |
{ | |
public readonly long DeliveryId; | |
public MessageConfirmed(long deliveryId) | |
{ | |
DeliveryId = deliveryId; | |
} | |
} | |
public sealed class Messenger<T> : AtLeastOnceDeliveryActor | |
{ | |
// we'll use those numbers to create current state snapshot every 100 events saved | |
private const int SnapshotInterval = 100; | |
private int counter = 0; | |
public Messenger(string persistenceId) | |
{ | |
PersistenceId = persistenceId; | |
} | |
public override string PersistenceId { get; } | |
protected override bool ReceiveRecover(object message) => message.Match() | |
// try to recover from the latests snapshot if possible | |
.With<SnapshotOffer>(offer => SetDeliverySnapshot((AtLeastOnceDeliverySnapshot)offer.Snapshot)) | |
.With<IMessengerEvent>(UpdateState) | |
.WasHandled; | |
protected override bool ReceiveCommand(object message) => message.Match() | |
// send by the requestor to reliably deliver a message T to the recipient | |
.With<DeliverOrder<T>>(order => PersistEvent(new MessageSent<T>(order.Recipient, order.Payload))) | |
// send by the recipient to confirm that the message was delivered successfully | |
.With<Confirm>(confirm => PersistEvent(new MessageConfirmed(confirm.DeliveryId))) | |
// send by the snapshot store to confirm, that snapshot has been saved | |
.With<SaveSnapshotSuccess>(snapshotSaved => | |
{ | |
var snapshotSeqNr = snapshotSaved.Metadata.SequenceNr; | |
// delete all messages from journal and snapshot store before latests confirmed | |
// snapshot, we won't need them anymore | |
DeleteMessages(snapshotSeqNr); | |
DeleteSnapshots(new SnapshotSelectionCriteria(snapshotSeqNr-1)); | |
}) | |
.WasHandled; | |
private void PersistEvent(IMessengerEvent e) | |
{ | |
// persist event | |
Persist(e, UpdateState); | |
// check if it's turn to save at-least-once-delivery state into snapshot store | |
counter = (counter + 1)%SnapshotInterval; | |
if (counter == 0) | |
{ | |
var snapshot = GetDeliverySnapshot(); | |
SaveSnapshot(snapshot); | |
} | |
} | |
private void UpdateState(IMessengerEvent message) => message.Match() | |
// once message sent request has been stored, start delivery procedure to recipient | |
.With<MessageSent<T>>(sent => Deliver(sent.Recipient.Path, deliveryId => new Delivery<T>(sent.Payload, deliveryId))) | |
// once message confirmation has been stored, officially confirm that delivery | |
.With<MessageConfirmed>(confirmed => ConfirmDelivery(confirmed.DeliveryId)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment