Skip to content

Instantly share code, notes, and snippets.

@object
Created May 10, 2021 16:11
Show Gist options
  • Select an option

  • Save object/cbe4eb97d28c0d4eabaf278ab2d148ef to your computer and use it in GitHub Desktop.

Select an option

Save object/cbe4eb97d28c0d4eabaf278ab2d148ef to your computer and use it in GitHub Desktop.
Code used by Guaranteed delivery actor
let rec loop lastSavedSnapshotSequenceNr =
actor {
let! msg = mailbox.Receive()
let effect = deliverer.Receive (upcast mailbox) msg
if effect.WasHandled()
then
return effect
else
return
match msg with
| Persisted mailbox event when (event :? Persistence.IProtoBufSerializableEvent) ->
let evt = event :?> Persistence.IProtoBufSerializableEvent |> Persistence.toDomain mailbox.System
mailbox.Self <! box evt
ignored ()
| SnapshotOffer snap ->
deliverer.DeliverySnapshot <- snap
ignored ()
| :? SaveSnapshotSuccess ->
ignored ()
| :? SaveSnapshotFailure as e ->
logErrorWithExnf mailbox e.Cause "Failed to save snapshot"
ignored ()
| :? RecoveryCompleted ->
ignored ()
| :? Cmd as cmd ->
match cmd with
| DeliverMessage(serializedPayload, recipient) ->
logDebugf mailbox $"DeliverMessage [%x{serializedPayload.GetHashCode()}]"
let p = Persistence.fromDomain <| Evt.MessageSent(serializedPayload, recipient)
Persist (box p) :> Effect<_>
| ConfirmDelivery(deliveryId) ->
logDebugWithCon mailbox (Some deliveryId) $"ConfirmDelivery [%d{deliveryId}]"
let p = Persistence.fromDomain <| Evt.DeliveryConfirmed(deliveryId)
Persist (box p) :> Effect<_>
| :? Evt as evt ->
match evt with
| MessageSent(serializedPayload, recipient) ->
logDebugf mailbox $"MessageSent [%x{serializedPayload.GetHashCode()}]"
let fac d = Message.createWithConfirmationId serializedPayload d
deliverer.Deliver(recipient.Path, fac, mailbox.IsRecovering()) |> ignore
| DeliveryConfirmed(deliveryId) ->
if aprops.CountDeliveryConfirmationAttempts then
logDebugWithCon mailbox (Some deliveryId) $"DeliveryConfirmed [%d{deliveryId}], attempt #%d{getDeliveryConfirmationCount deliveryId}"
else
logDebugWithCon mailbox (Some deliveryId) $"DeliveryConfirmed [%d{deliveryId}]"
deliverer.Confirm deliveryId |> ignore // Cancel further retries
if deliverer.DeliverySnapshot.UnconfirmedDeliveries.Length > 0 then
confirmExpiredDeliveries ()
let oldUnconfirmed = getUncofirmedDeliveries 5 (deliveryId-10L)
if oldUnconfirmed.Length > 0 then
logDebugWithCon mailbox (Some deliveryId) $"%d{oldUnconfirmed.Length} old unconfirmed deliveries (%s{String.Join (',' , oldUnconfirmed)}...)"
else
logDebugWithCon mailbox (Some deliveryId) "No unconfirmed deliveries"
if mailbox.LastSequenceNr () > lastSavedSnapshotSequenceNr then
typed mailbox.SnapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), deliverer.DeliverySnapshot)
aprops.Cleaner <! SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ())
loop <| mailbox.LastSequenceNr ()
else
ignored ()
| :? UnconfirmedWarning as warn ->
logDebugf mailbox $"Unconfirmed %d{warn.UnconfirmedDeliveries.Length} deliveries"
ignored ()
| :? GetLastSequenceNr ->
mailbox.Sender () <! mailbox.LastSequenceNr ()
ignored ()
| LifecycleEvent _ -> ignored ()
| PersistentLifecycleEvent _ -> ignored ()
| _ -> unhandled ()
}
type DeliveryPayload = string
type DeliveryEnvelope = Message<DeliveryPayload>
module DeliveryEnvelope =
let unwrap system (messageEnvelope : DeliveryEnvelope) =
deserializeObject (SerializationSettings.DefaultWithActorRef system) messageEnvelope.Payload
let wrap system message =
serializeObject (SerializationSettings.DefaultWithActorRef system) message
type Cmd =
| DeliverMessage of DeliveryPayload * IActorRef<DeliveryEnvelope>
| ConfirmDelivery of int64
type Evt =
| MessageSent of DeliveryPayload * IActorRef<DeliveryEnvelope>
| DeliveryConfirmed of int64
let confirmDelivery recipient (message : Message<_>) =
message.ConfirmationId
|> Option.iter (fun cid -> recipient <! ConfirmDelivery cid)
type GetLastSequenceNr = GetLastSequenceNr
[<RequireQualifiedAccess>]
module Persistence =
open System
open System.IO
open ProtoBuf
open Akka.Serialization
[<AllowNullLiteral>]
type IProtoBufSerializable = interface end
[<AllowNullLiteral>]
type IProtoBufSerializableEvent =
inherit IProtoBufSerializable
abstract member Timestamp : DateTimeOffset
[<ProtoContract;CLIMutable;NoComparison>]
type MessageSent =
{ [<ProtoMember(1)>] Payload : string
[<ProtoMember(2)>] Recipient : string
[<ProtoMember(3)>] Timestamp : DateTimeOffset }
interface IProtoBufSerializableEvent with
member this.Timestamp = this.Timestamp
static member FromDomain(payload : string, recipient : IActorRef<DeliveryEnvelope>) =
{ Payload = payload
Recipient = Serialization.SerializedActorPath(untyped recipient)
Timestamp = DateTimeOffset.Now }
member this.ToDomain(system : ActorSystem) =
let payload = this.Payload
let recipient = (system :?> ExtendedActorSystem).Provider.ResolveActorRef(this.Recipient) |> typed
payload, recipient
[<ProtoContract;CLIMutable;NoComparison>]
type DeliveryConfirmed =
{ [<ProtoMember(1)>] DeliveryId : int64
[<ProtoMember(2)>] Timestamp : DateTimeOffset }
interface IProtoBufSerializableEvent with
member this.Timestamp = this.Timestamp
static member FromDomain(deliveryId : int64) =
{ DeliveryId = deliveryId
Timestamp = DateTimeOffset.Now }
member this.ToDomain() =
this.DeliveryId
let fromDomain evt : IProtoBufSerializableEvent =
match evt with
| MessageSent (payload, recipient) -> MessageSent.FromDomain (payload, recipient) :> IProtoBufSerializableEvent
| DeliveryConfirmed deliveryId -> DeliveryConfirmed.FromDomain deliveryId :> IProtoBufSerializableEvent
let toDomain system (evt : IProtoBufSerializableEvent) =
match evt with
| :? MessageSent as evt -> Evt.MessageSent (evt.ToDomain system)
| :? DeliveryConfirmed as evt -> Evt.DeliveryConfirmed (evt.ToDomain())
| _ -> failwithf "Unsupported event %A" <| evt.GetType()
let [<Literal>] MessageSentManifest = "MessageSent"
let [<Literal>] DeliveryConfirmedManifest = "DeliveryConfirmed"
type ProtobufSerializer (system:Akka.Actor.ExtendedActorSystem) =
inherit Akka.Serialization.SerializerWithStringManifest(system)
override this.ToBinary(o:obj) =
use stream = new MemoryStream()
Serializer.Serialize(stream, o)
stream.ToArray()
override this.FromBinary(bytes:byte[], manifest:string) =
use stream = new MemoryStream(bytes)
let typ =
match manifest with
| MessageSentManifest -> typedefof<MessageSent>
| DeliveryConfirmedManifest -> typedefof<DeliveryConfirmed>
| _ -> notImpl $"Unknown manifest in DB: %s{manifest}"
ProtoBuf.Serializer.Deserialize(typ, stream)
override this.Manifest(o:obj) =
match o with
| :? MessageSent -> MessageSentManifest
| :? DeliveryConfirmed -> DeliveryConfirmedManifest
| _ -> notImpl $"Unknown type in DB: %s{o.GetType().FullName}"
override this.Identifier with get() = 128
type EventAdapter(system : ExtendedActorSystem) =
let ser = ProtobufSerializer(system)
interface Akka.Persistence.Journal.IEventAdapter with
member _.Manifest(evt : obj) =
ser.Manifest(evt)
member _.ToJournal(evt : obj) : obj =
Akka.Persistence.Journal.Tagged(box evt, [| "any" |]) :> obj
member _.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence =
if evt :? IProtoBufSerializable then
Akka.Persistence.Journal.EventSequence.Single(evt :?> IProtoBufSerializable)
else
Akka.Persistence.Journal.EventSequence.Empty
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment