Created
May 10, 2021 16:11
-
-
Save object/cbe4eb97d28c0d4eabaf278ab2d148ef to your computer and use it in GitHub Desktop.
Code used by Guaranteed delivery actor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| let rec loop lastSavedSnapshotSequenceNr = | |
| actor { | |
| let! msg = mailbox.Receive() | |
| let effect = deliverer.Receive (upcast mailbox) msg | |
| if effect.WasHandled() | |
| then | |
| return effect | |
| else | |
| return | |
| match msg with | |
| | Persisted mailbox event when (event :? Persistence.IProtoBufSerializableEvent) -> | |
| let evt = event :?> Persistence.IProtoBufSerializableEvent |> Persistence.toDomain mailbox.System | |
| mailbox.Self <! box evt | |
| ignored () | |
| | SnapshotOffer snap -> | |
| deliverer.DeliverySnapshot <- snap | |
| ignored () | |
| | :? SaveSnapshotSuccess -> | |
| ignored () | |
| | :? SaveSnapshotFailure as e -> | |
| logErrorWithExnf mailbox e.Cause "Failed to save snapshot" | |
| ignored () | |
| | :? RecoveryCompleted -> | |
| ignored () | |
| | :? Cmd as cmd -> | |
| match cmd with | |
| | DeliverMessage(serializedPayload, recipient) -> | |
| logDebugf mailbox $"DeliverMessage [%x{serializedPayload.GetHashCode()}]" | |
| let p = Persistence.fromDomain <| Evt.MessageSent(serializedPayload, recipient) | |
| Persist (box p) :> Effect<_> | |
| | ConfirmDelivery(deliveryId) -> | |
| logDebugWithCon mailbox (Some deliveryId) $"ConfirmDelivery [%d{deliveryId}]" | |
| let p = Persistence.fromDomain <| Evt.DeliveryConfirmed(deliveryId) | |
| Persist (box p) :> Effect<_> | |
| | :? Evt as evt -> | |
| match evt with | |
| | MessageSent(serializedPayload, recipient) -> | |
| logDebugf mailbox $"MessageSent [%x{serializedPayload.GetHashCode()}]" | |
| let fac d = Message.createWithConfirmationId serializedPayload d | |
| deliverer.Deliver(recipient.Path, fac, mailbox.IsRecovering()) |> ignore | |
| | DeliveryConfirmed(deliveryId) -> | |
| if aprops.CountDeliveryConfirmationAttempts then | |
| logDebugWithCon mailbox (Some deliveryId) $"DeliveryConfirmed [%d{deliveryId}], attempt #%d{getDeliveryConfirmationCount deliveryId}" | |
| else | |
| logDebugWithCon mailbox (Some deliveryId) $"DeliveryConfirmed [%d{deliveryId}]" | |
| deliverer.Confirm deliveryId |> ignore // Cancel further retries | |
| if deliverer.DeliverySnapshot.UnconfirmedDeliveries.Length > 0 then | |
| confirmExpiredDeliveries () | |
| let oldUnconfirmed = getUncofirmedDeliveries 5 (deliveryId-10L) | |
| if oldUnconfirmed.Length > 0 then | |
| logDebugWithCon mailbox (Some deliveryId) $"%d{oldUnconfirmed.Length} old unconfirmed deliveries (%s{String.Join (',' , oldUnconfirmed)}...)" | |
| else | |
| logDebugWithCon mailbox (Some deliveryId) "No unconfirmed deliveries" | |
| if mailbox.LastSequenceNr () > lastSavedSnapshotSequenceNr then | |
| typed mailbox.SnapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), deliverer.DeliverySnapshot) | |
| aprops.Cleaner <! SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()) | |
| loop <| mailbox.LastSequenceNr () | |
| else | |
| ignored () | |
| | :? UnconfirmedWarning as warn -> | |
| logDebugf mailbox $"Unconfirmed %d{warn.UnconfirmedDeliveries.Length} deliveries" | |
| ignored () | |
| | :? GetLastSequenceNr -> | |
| mailbox.Sender () <! mailbox.LastSequenceNr () | |
| ignored () | |
| | LifecycleEvent _ -> ignored () | |
| | PersistentLifecycleEvent _ -> ignored () | |
| | _ -> unhandled () | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type DeliveryPayload = string | |
| type DeliveryEnvelope = Message<DeliveryPayload> | |
| module DeliveryEnvelope = | |
| let unwrap system (messageEnvelope : DeliveryEnvelope) = | |
| deserializeObject (SerializationSettings.DefaultWithActorRef system) messageEnvelope.Payload | |
| let wrap system message = | |
| serializeObject (SerializationSettings.DefaultWithActorRef system) message | |
| type Cmd = | |
| | DeliverMessage of DeliveryPayload * IActorRef<DeliveryEnvelope> | |
| | ConfirmDelivery of int64 | |
| type Evt = | |
| | MessageSent of DeliveryPayload * IActorRef<DeliveryEnvelope> | |
| | DeliveryConfirmed of int64 | |
| let confirmDelivery recipient (message : Message<_>) = | |
| message.ConfirmationId | |
| |> Option.iter (fun cid -> recipient <! ConfirmDelivery cid) | |
| type GetLastSequenceNr = GetLastSequenceNr | |
| [<RequireQualifiedAccess>] | |
| module Persistence = | |
| open System | |
| open System.IO | |
| open ProtoBuf | |
| open Akka.Serialization | |
| [<AllowNullLiteral>] | |
| type IProtoBufSerializable = interface end | |
| [<AllowNullLiteral>] | |
| type IProtoBufSerializableEvent = | |
| inherit IProtoBufSerializable | |
| abstract member Timestamp : DateTimeOffset | |
| [<ProtoContract;CLIMutable;NoComparison>] | |
| type MessageSent = | |
| { [<ProtoMember(1)>] Payload : string | |
| [<ProtoMember(2)>] Recipient : string | |
| [<ProtoMember(3)>] Timestamp : DateTimeOffset } | |
| interface IProtoBufSerializableEvent with | |
| member this.Timestamp = this.Timestamp | |
| static member FromDomain(payload : string, recipient : IActorRef<DeliveryEnvelope>) = | |
| { Payload = payload | |
| Recipient = Serialization.SerializedActorPath(untyped recipient) | |
| Timestamp = DateTimeOffset.Now } | |
| member this.ToDomain(system : ActorSystem) = | |
| let payload = this.Payload | |
| let recipient = (system :?> ExtendedActorSystem).Provider.ResolveActorRef(this.Recipient) |> typed | |
| payload, recipient | |
| [<ProtoContract;CLIMutable;NoComparison>] | |
| type DeliveryConfirmed = | |
| { [<ProtoMember(1)>] DeliveryId : int64 | |
| [<ProtoMember(2)>] Timestamp : DateTimeOffset } | |
| interface IProtoBufSerializableEvent with | |
| member this.Timestamp = this.Timestamp | |
| static member FromDomain(deliveryId : int64) = | |
| { DeliveryId = deliveryId | |
| Timestamp = DateTimeOffset.Now } | |
| member this.ToDomain() = | |
| this.DeliveryId | |
| let fromDomain evt : IProtoBufSerializableEvent = | |
| match evt with | |
| | MessageSent (payload, recipient) -> MessageSent.FromDomain (payload, recipient) :> IProtoBufSerializableEvent | |
| | DeliveryConfirmed deliveryId -> DeliveryConfirmed.FromDomain deliveryId :> IProtoBufSerializableEvent | |
| let toDomain system (evt : IProtoBufSerializableEvent) = | |
| match evt with | |
| | :? MessageSent as evt -> Evt.MessageSent (evt.ToDomain system) | |
| | :? DeliveryConfirmed as evt -> Evt.DeliveryConfirmed (evt.ToDomain()) | |
| | _ -> failwithf "Unsupported event %A" <| evt.GetType() | |
| let [<Literal>] MessageSentManifest = "MessageSent" | |
| let [<Literal>] DeliveryConfirmedManifest = "DeliveryConfirmed" | |
| type ProtobufSerializer (system:Akka.Actor.ExtendedActorSystem) = | |
| inherit Akka.Serialization.SerializerWithStringManifest(system) | |
| override this.ToBinary(o:obj) = | |
| use stream = new MemoryStream() | |
| Serializer.Serialize(stream, o) | |
| stream.ToArray() | |
| override this.FromBinary(bytes:byte[], manifest:string) = | |
| use stream = new MemoryStream(bytes) | |
| let typ = | |
| match manifest with | |
| | MessageSentManifest -> typedefof<MessageSent> | |
| | DeliveryConfirmedManifest -> typedefof<DeliveryConfirmed> | |
| | _ -> notImpl $"Unknown manifest in DB: %s{manifest}" | |
| ProtoBuf.Serializer.Deserialize(typ, stream) | |
| override this.Manifest(o:obj) = | |
| match o with | |
| | :? MessageSent -> MessageSentManifest | |
| | :? DeliveryConfirmed -> DeliveryConfirmedManifest | |
| | _ -> notImpl $"Unknown type in DB: %s{o.GetType().FullName}" | |
| override this.Identifier with get() = 128 | |
| type EventAdapter(system : ExtendedActorSystem) = | |
| let ser = ProtobufSerializer(system) | |
| interface Akka.Persistence.Journal.IEventAdapter with | |
| member _.Manifest(evt : obj) = | |
| ser.Manifest(evt) | |
| member _.ToJournal(evt : obj) : obj = | |
| Akka.Persistence.Journal.Tagged(box evt, [| "any" |]) :> obj | |
| member _.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence = | |
| if evt :? IProtoBufSerializable then | |
| Akka.Persistence.Journal.EventSequence.Single(evt :?> IProtoBufSerializable) | |
| else | |
| Akka.Persistence.Journal.EventSequence.Empty |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment