Skip to content

Instantly share code, notes, and snippets.

@atsapura
Forked from object/GuaranteedDeliveryActor.fs
Created September 9, 2020 16:40
Show Gist options
  • Save atsapura/8defe6bc34dfb60c32afe5382b9ff5c9 to your computer and use it in GitHub Desktop.
Save atsapura/8defe6bc34dfb60c32afe5382b9ff5c9 to your computer and use it in GitHub Desktop.
Guaranteed delivery actor in Akka.NET/F#
module GuaranteedDelivery =
open Akka.Actor
open Akka.Persistence
open Akkling
open Akkling.Persistence
type Payload = string
type DeliveryEnvelope = { Payload: Payload; DeliveryId: int64 }
type Cmd =
| DeliverMessage of Payload * IActorRef<DeliveryEnvelope>
| ConfirmDelivery of int64
type private Evt =
| MessageSent of Payload * IActorRef<DeliveryEnvelope>
| DeliveryConfirmed of int64
[<RequireQualifiedAccess>]
module private Persistence =
open System
open System.IO
open ProtoBuf
open Akka.Serialization
[<AllowNullLiteral>]
type IProtoBufSerializable = interface end
[<AllowNullLiteral>]
type IProtoBufSerializableEvent =
inherit IProtoBufSerializable
abstract member Timestamp : DateTimeOffset
[<ProtoContract;CLIMutable;NoComparison>]
type MessageSent =
{ [<ProtoMember(1)>] Payload : string
[<ProtoMember(2)>] Recipient : string
[<ProtoMember(3)>] Timestamp : DateTimeOffset }
interface IProtoBufSerializableEvent with
member this.Timestamp = this.Timestamp
static member FromDomain(payload : string, recipient : IActorRef<DeliveryEnvelope>) =
{ Payload = payload
Recipient = Serialization.SerializedActorPath(untyped recipient)
Timestamp = DateTimeOffset.Now }
member this.ToDomain<'T>(system : ExtendedActorSystem) =
this.Payload, system.Provider.ResolveActorRef(this.Recipient) |> typed
[<ProtoContract;CLIMutable;NoComparison>]
type DeliveryConfirmed =
{ [<ProtoMember(1)>] DeliveryId : int64
[<ProtoMember(2)>] Timestamp : DateTimeOffset }
interface IProtoBufSerializableEvent with
member this.Timestamp = this.Timestamp
static member FromDomain(deliveryId : int64) =
{ DeliveryId = deliveryId
Timestamp = DateTimeOffset.Now }
member this.ToDomain() =
this.DeliveryId
let fromDomain evt : IProtoBufSerializableEvent =
match evt with
| MessageSent (payload, recipient) -> MessageSent.FromDomain (payload, recipient) :> IProtoBufSerializableEvent
| DeliveryConfirmed deliveryId -> DeliveryConfirmed.FromDomain deliveryId :> IProtoBufSerializableEvent
let toDomain system (evt : IProtoBufSerializableEvent) =
match evt with
| :? MessageSent as evt -> Evt.MessageSent (evt.ToDomain system)
| :? DeliveryConfirmed as evt -> Evt.DeliveryConfirmed (evt.ToDomain())
| _ -> failwithf "Unsupported event %A" <| evt.GetType()
let [<Literal>] MessageSentManifest = "MessageSent"
let [<Literal>] DeliveryConfirmedManifest = "DeliveryConfirmed"
type ProtobufSerializer (system:Akka.Actor.ExtendedActorSystem) =
inherit Akka.Serialization.SerializerWithStringManifest(system)
override this.ToBinary(o:obj) =
use stream = new MemoryStream()
Serializer.Serialize(stream, o)
stream.ToArray()
override this.FromBinary(bytes:byte[], manifest:string) =
use stream = new MemoryStream(bytes)
let typ =
match manifest with
| MessageSentManifest -> typedefof<MessageSent>
| DeliveryConfirmedManifest -> typedefof<DeliveryConfirmed>
| _ -> notImpl (sprintf "Unknown manifest in DB: %s" manifest)
ProtoBuf.Serializer.Deserialize(typ, stream)
override this.Manifest(o:obj) =
match o with
| :? MessageSent -> MessageSentManifest
| :? DeliveryConfirmed -> DeliveryConfirmedManifest
| _ -> notImpl (sprintf "Unknown type in DB: %s" (o.GetType().FullName))
override this.Identifier with get() = 128
type EventAdapter(system : ExtendedActorSystem) =
let ser = ProtobufSerializer(system)
interface Akka.Persistence.Journal.IEventAdapter with
member __.Manifest(evt : obj) =
ser.Manifest(evt)
member __.ToJournal(evt : obj) : obj =
Akka.Persistence.Journal.Tagged(box evt, [| "any" |]) :> obj
member __.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence =
if evt :? IProtoBufSerializable then
Akka.Persistence.Journal.EventSequence.Single(evt :?> IProtoBufSerializable)
else
Akka.Persistence.Journal.EventSequence.Empty
let private cleanerActor connectionString (mailbox : Actor<_>) =
let rec loop () =
actor {
let! (msg : SnapshotMetadata) = mailbox.Receive()
logDebugf mailbox "Cleaning delivered messages up to [%d]" msg.SequenceNr
let commandText =
sprintf """DELETE FROM [dbo].[EventJournal] WHERE PersistenceID='%s' AND SequenceNr<=%d;
DELETE FROM [dbo].[SnapshotStore] WHERE PersistenceID='%s' AND SequenceNr<=%d"""
msg.PersistenceId msg.SequenceNr msg.PersistenceId msg.SequenceNr
use connection = new System.Data.SqlClient.SqlConnection(connectionString)
connection.Open ()
let command = connection.CreateCommand ()
command.CommandText <- commandText
command.ExecuteNonQuery () |> ignore
ignored ()
}
loop ()
let private messengerActor connectionString (mailbox : Eventsourced<_>) =
let deliverer = AtLeastOnceDelivery.createDefault mailbox
let cleaner = spawn mailbox "cleaner" (props <| cleanerActor connectionString)
let rec loop () =
actor {
let! msg = mailbox.Receive()
let effect = deliverer.Receive (upcast mailbox) msg
if effect.WasHandled()
then
return effect
else
match msg with
| :? Persistence.IProtoBufSerializableEvent as event ->
mailbox.Self <! box (event |> Persistence.toDomain (mailbox.System :?> ExtendedActorSystem))
| Persisted mailbox event when (event :? Persistence.IProtoBufSerializableEvent) -> ignored ()
| SnapshotOffer snap ->
deliverer.DeliverySnapshot <- snap
| :? SaveSnapshotSuccess -> ignored ()
| :? SaveSnapshotFailure as e ->
logErrorWithExnf mailbox e.Cause "Failed to save snapshot"
ignored ()
| PersistentLifecycleEvent e ->
match e with
| ReplayFailed (exn, message) ->
logErrorWithExn mailbox exn (string message)
ignored ()
| ReplaySucceed ->
ignored ()
| :? RecoveryCompleted ->
ignored ()
| :? Cmd as cmd ->
match cmd with
| DeliverMessage(payload, recipient) ->
let p = Persistence.fromDomain <| Evt.MessageSent(payload, recipient)
return Persist (box p) :> Effect<_>
| ConfirmDelivery(deliveryId) ->
let p = Persistence.fromDomain <| Evt.DeliveryConfirmed(deliveryId)
return Persist (box p) :> Effect<_>
| :? Evt as evt ->
match evt with
| MessageSent(payload, recipient) ->
let fac d = { Payload = payload; DeliveryId = d }
return deliverer.Deliver(recipient.Path, fac, mailbox.IsRecovering()) |> ignored
| DeliveryConfirmed(deliveryId) ->
if deliverer.Confirm deliveryId then
let snapshotStore = typed mailbox.SnapshotStore
snapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), deliverer.DeliverySnapshot)
if deliverer.DeliverySnapshot.UnconfirmedDeliveries.Length = 0 then
cleaner <! SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ())
ignored ()
| LifecycleEvent _ -> ignored ()
| _ ->
logDebugf mailbox "Unable to handle [%s]" (msg.GetType().Name)
unhandled ()
}
loop ()
let guaranteedDeliveryActor<'Payload> connectionString getPersistentActorName deliveryActorProps (mailbox : Actor<_>) =
let actorName = getPersistentActorName ()
let messenger = retype (spawn mailbox actorName (propsPersist <| messengerActor connectionString))
let deliverer = spawn mailbox "deliverer" <| props deliveryActorProps
let rec loop () =
actor {
let! (message : 'Payload) = mailbox.Receive()
messenger <! DeliverMessage (serializeObject SerializationSettings.Default message, deliverer)
return! ignored ()
}
loop ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment