Skip to content

Instantly share code, notes, and snippets.

@Lanayx
Created June 13, 2019 14:17
Show Gist options
  • Save Lanayx/c2050966476ccbd075b58a813fe515e2 to your computer and use it in GitHub Desktop.
Save Lanayx/c2050966476ccbd075b58a813fe515e2 to your computer and use it in GitHub Desktop.
type MailboxMessage =
| AddMessage of Message
| GetMessage of AsyncReplyChannel<Message>
type Consumer() =
let queue = new ConcurrentQueue<Message>()
let connectionHandler = ConnectionHandler()
let mb = MailboxProcessor.Start(fun inbox ->
let mutable channel: AsyncReplyChannel<Message> = Unchecked.defaultof<AsyncReplyChannel<Message>>
let rec loop ()=
async {
let! msg= inbox.Receive()
match msg with
| AddMessage x->
if isNull (box channel)
then queue.Enqueue(x)
else channel.Reply(x)
| GetMessage ch->
match queue.TryDequeue() with
| true, msg ->
ch.Reply msg
| false, _ ->
channel <- ch
return! loop ()
}
loop ()
)
do connectionHandler.MessageReceived.Add(fun msg ->
mb.Post(AddMessage msg)
)
member this.ReceiveAsync() =
task {
return! mb.PostAndAsyncReply(MailboxMessage.GetMessage)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment