-
-
Save atsapura/8defe6bc34dfb60c32afe5382b9ff5c9 to your computer and use it in GitHub Desktop.
Guaranteed delivery actor in Akka.NET/F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module GuaranteedDelivery = | |
open Akka.Actor | |
open Akka.Persistence | |
open Akkling | |
open Akkling.Persistence | |
type Payload = string | |
type DeliveryEnvelope = { Payload: Payload; DeliveryId: int64 } | |
type Cmd = | |
| DeliverMessage of Payload * IActorRef<DeliveryEnvelope> | |
| ConfirmDelivery of int64 | |
type private Evt = | |
| MessageSent of Payload * IActorRef<DeliveryEnvelope> | |
| DeliveryConfirmed of int64 | |
[<RequireQualifiedAccess>] | |
module private Persistence = | |
open System | |
open System.IO | |
open ProtoBuf | |
open Akka.Serialization | |
[<AllowNullLiteral>] | |
type IProtoBufSerializable = interface end | |
[<AllowNullLiteral>] | |
type IProtoBufSerializableEvent = | |
inherit IProtoBufSerializable | |
abstract member Timestamp : DateTimeOffset | |
[<ProtoContract;CLIMutable;NoComparison>] | |
type MessageSent = | |
{ [<ProtoMember(1)>] Payload : string | |
[<ProtoMember(2)>] Recipient : string | |
[<ProtoMember(3)>] Timestamp : DateTimeOffset } | |
interface IProtoBufSerializableEvent with | |
member this.Timestamp = this.Timestamp | |
static member FromDomain(payload : string, recipient : IActorRef<DeliveryEnvelope>) = | |
{ Payload = payload | |
Recipient = Serialization.SerializedActorPath(untyped recipient) | |
Timestamp = DateTimeOffset.Now } | |
member this.ToDomain<'T>(system : ExtendedActorSystem) = | |
this.Payload, system.Provider.ResolveActorRef(this.Recipient) |> typed | |
[<ProtoContract;CLIMutable;NoComparison>] | |
type DeliveryConfirmed = | |
{ [<ProtoMember(1)>] DeliveryId : int64 | |
[<ProtoMember(2)>] Timestamp : DateTimeOffset } | |
interface IProtoBufSerializableEvent with | |
member this.Timestamp = this.Timestamp | |
static member FromDomain(deliveryId : int64) = | |
{ DeliveryId = deliveryId | |
Timestamp = DateTimeOffset.Now } | |
member this.ToDomain() = | |
this.DeliveryId | |
let fromDomain evt : IProtoBufSerializableEvent = | |
match evt with | |
| MessageSent (payload, recipient) -> MessageSent.FromDomain (payload, recipient) :> IProtoBufSerializableEvent | |
| DeliveryConfirmed deliveryId -> DeliveryConfirmed.FromDomain deliveryId :> IProtoBufSerializableEvent | |
let toDomain system (evt : IProtoBufSerializableEvent) = | |
match evt with | |
| :? MessageSent as evt -> Evt.MessageSent (evt.ToDomain system) | |
| :? DeliveryConfirmed as evt -> Evt.DeliveryConfirmed (evt.ToDomain()) | |
| _ -> failwithf "Unsupported event %A" <| evt.GetType() | |
let [<Literal>] MessageSentManifest = "MessageSent" | |
let [<Literal>] DeliveryConfirmedManifest = "DeliveryConfirmed" | |
type ProtobufSerializer (system:Akka.Actor.ExtendedActorSystem) = | |
inherit Akka.Serialization.SerializerWithStringManifest(system) | |
override this.ToBinary(o:obj) = | |
use stream = new MemoryStream() | |
Serializer.Serialize(stream, o) | |
stream.ToArray() | |
override this.FromBinary(bytes:byte[], manifest:string) = | |
use stream = new MemoryStream(bytes) | |
let typ = | |
match manifest with | |
| MessageSentManifest -> typedefof<MessageSent> | |
| DeliveryConfirmedManifest -> typedefof<DeliveryConfirmed> | |
| _ -> notImpl (sprintf "Unknown manifest in DB: %s" manifest) | |
ProtoBuf.Serializer.Deserialize(typ, stream) | |
override this.Manifest(o:obj) = | |
match o with | |
| :? MessageSent -> MessageSentManifest | |
| :? DeliveryConfirmed -> DeliveryConfirmedManifest | |
| _ -> notImpl (sprintf "Unknown type in DB: %s" (o.GetType().FullName)) | |
override this.Identifier with get() = 128 | |
type EventAdapter(system : ExtendedActorSystem) = | |
let ser = ProtobufSerializer(system) | |
interface Akka.Persistence.Journal.IEventAdapter with | |
member __.Manifest(evt : obj) = | |
ser.Manifest(evt) | |
member __.ToJournal(evt : obj) : obj = | |
Akka.Persistence.Journal.Tagged(box evt, [| "any" |]) :> obj | |
member __.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence = | |
if evt :? IProtoBufSerializable then | |
Akka.Persistence.Journal.EventSequence.Single(evt :?> IProtoBufSerializable) | |
else | |
Akka.Persistence.Journal.EventSequence.Empty | |
let private cleanerActor connectionString (mailbox : Actor<_>) = | |
let rec loop () = | |
actor { | |
let! (msg : SnapshotMetadata) = mailbox.Receive() | |
logDebugf mailbox "Cleaning delivered messages up to [%d]" msg.SequenceNr | |
let commandText = | |
sprintf """DELETE FROM [dbo].[EventJournal] WHERE PersistenceID='%s' AND SequenceNr<=%d; | |
DELETE FROM [dbo].[SnapshotStore] WHERE PersistenceID='%s' AND SequenceNr<=%d""" | |
msg.PersistenceId msg.SequenceNr msg.PersistenceId msg.SequenceNr | |
use connection = new System.Data.SqlClient.SqlConnection(connectionString) | |
connection.Open () | |
let command = connection.CreateCommand () | |
command.CommandText <- commandText | |
command.ExecuteNonQuery () |> ignore | |
ignored () | |
} | |
loop () | |
let private messengerActor connectionString (mailbox : Eventsourced<_>) = | |
let deliverer = AtLeastOnceDelivery.createDefault mailbox | |
let cleaner = spawn mailbox "cleaner" (props <| cleanerActor connectionString) | |
let rec loop () = | |
actor { | |
let! msg = mailbox.Receive() | |
let effect = deliverer.Receive (upcast mailbox) msg | |
if effect.WasHandled() | |
then | |
return effect | |
else | |
match msg with | |
| :? Persistence.IProtoBufSerializableEvent as event -> | |
mailbox.Self <! box (event |> Persistence.toDomain (mailbox.System :?> ExtendedActorSystem)) | |
| Persisted mailbox event when (event :? Persistence.IProtoBufSerializableEvent) -> ignored () | |
| SnapshotOffer snap -> | |
deliverer.DeliverySnapshot <- snap | |
| :? SaveSnapshotSuccess -> ignored () | |
| :? SaveSnapshotFailure as e -> | |
logErrorWithExnf mailbox e.Cause "Failed to save snapshot" | |
ignored () | |
| PersistentLifecycleEvent e -> | |
match e with | |
| ReplayFailed (exn, message) -> | |
logErrorWithExn mailbox exn (string message) | |
ignored () | |
| ReplaySucceed -> | |
ignored () | |
| :? RecoveryCompleted -> | |
ignored () | |
| :? Cmd as cmd -> | |
match cmd with | |
| DeliverMessage(payload, recipient) -> | |
let p = Persistence.fromDomain <| Evt.MessageSent(payload, recipient) | |
return Persist (box p) :> Effect<_> | |
| ConfirmDelivery(deliveryId) -> | |
let p = Persistence.fromDomain <| Evt.DeliveryConfirmed(deliveryId) | |
return Persist (box p) :> Effect<_> | |
| :? Evt as evt -> | |
match evt with | |
| MessageSent(payload, recipient) -> | |
let fac d = { Payload = payload; DeliveryId = d } | |
return deliverer.Deliver(recipient.Path, fac, mailbox.IsRecovering()) |> ignored | |
| DeliveryConfirmed(deliveryId) -> | |
if deliverer.Confirm deliveryId then | |
let snapshotStore = typed mailbox.SnapshotStore | |
snapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), deliverer.DeliverySnapshot) | |
if deliverer.DeliverySnapshot.UnconfirmedDeliveries.Length = 0 then | |
cleaner <! SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()) | |
ignored () | |
| LifecycleEvent _ -> ignored () | |
| _ -> | |
logDebugf mailbox "Unable to handle [%s]" (msg.GetType().Name) | |
unhandled () | |
} | |
loop () | |
let guaranteedDeliveryActor<'Payload> connectionString getPersistentActorName deliveryActorProps (mailbox : Actor<_>) = | |
let actorName = getPersistentActorName () | |
let messenger = retype (spawn mailbox actorName (propsPersist <| messengerActor connectionString)) | |
let deliverer = spawn mailbox "deliverer" <| props deliveryActorProps | |
let rec loop () = | |
actor { | |
let! (message : 'Payload) = mailbox.Receive() | |
messenger <! DeliverMessage (serializeObject SerializationSettings.Default message, deliverer) | |
return! ignored () | |
} | |
loop () |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment