Last active
August 15, 2020 09:36
-
-
Save isaacabraham/4e6b6bf1ab0c14a26eb4 to your computer and use it in GitHub Desktop.
Demonstrates how to use F# mailbox processors in conjunction with Azure Storage Queues.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Code to bind mailbox processors to azure storage queues. | |
module AzureMailboxProcessor | |
open System | |
module private Async = | |
let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore | |
module private Option = | |
let fromNullable (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None | |
let toNullable = function | |
| Some value -> Nullable value | |
| None -> Nullable() | |
open Microsoft.WindowsAzure.Storage | |
open Microsoft.WindowsAzure.Storage.Queue | |
open Newtonsoft.Json | |
/// Gets a handle to an Azure storage queue | |
let private getQueue (connectionString, queueName) = | |
let connection = CloudStorageAccount.Parse connectionString | |
let queueClient = connection.CreateCloudQueueClient() | |
queueClient.GetQueueReference queueName | |
/// Creates a mailbox processor that writes to an Azure storage queue | |
let createQueueWriter<'a> (connectionString, queueName) = | |
let queue = getQueue(connectionString, queueName) | |
new MailboxProcessor<'a>(fun mb -> async { | |
while true do | |
let! message = mb.Receive() | |
let message = CloudQueueMessage(message |> JsonConvert.SerializeObject) | |
do! queue.AddMessageAsync(message) |> Async.AwaitTaskEmpty }) | |
// The different completion statuses a message can have. | |
type MessageProcessedStatus<'a> = | |
/// The message successfully completed. | |
| Completed | |
/// The message was not processed successfully and should be returned to the queue for processing again. | |
| Failed | |
/// Replace the original queue message with a new payload. | |
| Update of UpdatedPayload : 'a | |
/// Contains details on the queue subscription. | |
type QueueSubscriptionOptions = | |
{ /// How long to wait between polling requests. | |
PollTime : TimeSpan | |
/// The lease time for a queue message. | |
LeaseLength : TimeSpan option | |
/// How many times a message can be dequeued before being permanently removed. | |
MaxDequeueCount : int option } | |
let private completeMessage message (queue:CloudQueue) = | |
queue.DeleteMessageAsync message |> Async.AwaitTaskEmpty | |
/// Represents an F# Agent that can be bound to an Azure storage queue. | |
type AzureStorageQueueAgent<'a> = MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>> | |
/// Binds a MailboxProcessor to an Azure storage queue. | |
let bindToQueue<'a>((connectionString, queueName), options) (agent:AzureStorageQueueAgent<'a>) = | |
let queue = getQueue(connectionString, queueName) | |
async { | |
while true do | |
let! message = queue.GetMessageAsync(options.LeaseLength |> Option.toNullable, null, null) |> Async.AwaitTask | |
match message with | |
| null-> | |
do! Async.Sleep(options.PollTime.TotalMilliseconds |> int) | |
| message -> | |
match message.DequeueCount, options.MaxDequeueCount with | |
| count, Some limit when count >= limit -> | |
// Message dequeue count exceeded, just delete the message | |
do! queue |> completeMessage message | |
| _ -> | |
try | |
let timeout = match options.LeaseLength with | |
| Some lifetime -> lifetime.TotalMilliseconds |> int | |
| None -> Threading.Timeout.Infinite | |
let status = agent.TryPostAndReply((fun ch -> JsonConvert.DeserializeObject<'a>(message.AsString), ch), timeout) | |
match status with | |
| None // there was no reply from the agent within specified time; lease has expired. | |
| Some Failed -> () // message has failed to process; do not complete the message. | |
| Some Completed -> do! queue |> completeMessage message | |
| Some (Update payload) -> | |
do! queue |> completeMessage message | |
do! queue.AddMessageAsync(CloudQueueMessage(payload |> JsonConvert.SerializeObject)) |> Async.AwaitTaskEmpty | |
with ex -> printfn "ARGH %s" ex.Message | |
} |> Async.Start |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#I @"..\packages\" | |
#r @"Newtonsoft.Json.6.0.3\lib\net45\Newtonsoft.Json.dll" | |
#r @"WindowsAzure.Storage.4.1.0\lib\net40\Microsoft.WindowsAzure.Storage.dll" | |
#load "StorageQueueAgent.fs" | |
type Person = { Name : string; Age : int } | |
open AzureMailboxProcessor | |
open System | |
// Azure Storage Queue details | |
let queueDetails = ("UseDevelopmentStorage=true", "sample-queue") | |
// Reader | |
let subscriber = new AzureStorageQueueAgent<Person>(fun mb -> | |
let rec loop() = | |
async { | |
let! person, channel = mb.Receive() | |
// process the message and get the result | |
let status = match person with | |
| { Name = "Isaac"; Age = _ } | |
| { Name = "Richard"; Age = _ } -> Completed | |
| { Name = "Andy"; Age = _ } -> Update { person with Name = person.Name + "xyz" } | |
| _ -> Failed | |
// reply with the result - gets adapted by the StorageQueueAgent into the appropriate Azure Storage Queue behaviour | |
channel.Reply status | |
return! loop() | |
} | |
loop()) | |
subscriber.Start() | |
/// Magic to bind the reader to the storage queue | |
subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; LeaseLength = Some <| TimeSpan.FromSeconds 5.; MaxDequeueCount = Some 3 }) | |
/// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine) | |
let writer = queueDetails |> createQueueWriter<Person> | |
writer.Start() | |
writer.Post { Name = "Isaac"; Age = 34 } | |
writer.Post { Name = "Andy"; Age = 32 } | |
writer.Post { Name = "Richard"; Age = 39 } | |
writer.Post { Name = "Joe Bloggs"; Age = 35 } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment