-
-
Save denmerc/f9ebef0a14856cf336bb to your computer and use it in GitHub Desktop.
Mailbox Processor in Cloud Agent with automatic dead lettering and at-least-once processing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let CreateActor (ActorKey name) = | |
MailboxProcessor.Start(fun mailbox -> | |
let messageStore = GetMessageStore name | |
let rec loop data = | |
async { | |
// Wait asynchronously until we get a message + reply channel | |
let! message, replyWith = mailbox.Receive() | |
match message with | |
| Clear -> | |
messageStore.DeleteIfExists() | |
replyWith Completed // confirm processing! | |
return! loop { Count = 0; Messages = [] } | |
| Record message when data.Count < 5 -> | |
let updatedData = | |
{ data with | |
Count = data.Count + 1 | |
Messages = data.Messages @ [ message ] } | |
messageStore.SetData(updatedData) | |
replyWith Completed // confirm processing! | |
return! (loop updatedData) | |
| Record message -> | |
// bad data - max limit of messages is 5 | |
replyWith Abandoned | |
} | |
let data = defaultArg (messageStore.GetData()) { Count = 0; Messages = [] } | |
loop data) | |
// Change how we start listening to the pool as well - ResilientCloudAgent instead of BasicCloudAgent | |
let disposable = ConnectionFactory.StartListening(cloudConnection, CreateActor >> ResilientCloudAgent) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment