Created
February 15, 2013 15:27
-
-
Save mavnn/4961056 to your computer and use it in GitHub Desktop.
Throttled parallel execution.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System.Collections.Concurrent | |
type JobRequest<'T> = | |
{ | |
Id : int | |
WorkItem : 'T | |
} | |
type WorkRequest<'T> = | |
| Job of JobRequest<'T> | |
| End | |
let inline doParallelWithThrottle<'a, 'b> limit f items = | |
let itemArray = Seq.toArray items | |
let itemCount = Array.length itemArray | |
let resultMap = ConcurrentDictionary<int, 'b>() | |
use block = new BlockingCollection<WorkRequest<'a>>(1) | |
use completeBlock = new BlockingCollection<unit>(1) | |
let monitor = | |
MailboxProcessor.Start(fun inbox -> | |
let rec inner complete = | |
async { | |
do! inbox.Receive() | |
if complete + 1 = limit then | |
completeBlock.Add(()) | |
return () | |
else | |
return! inner <| complete + 1 | |
} | |
inner 0) | |
let createAgent () = | |
MailboxProcessor.Start( | |
fun inbox -> | |
let rec inner () = async { | |
let! request = async { return block.Take() } | |
match request with | |
| Job job -> | |
let! result = async { return f (job.WorkItem) } | |
resultMap.AddOrUpdate(job.Id, result, fun _ _ -> result) |> ignore | |
return! inner () | |
| End -> | |
monitor.Post () | |
return () | |
} | |
inner () | |
) | |
let agents = | |
[| for i in 1..limit -> createAgent() |] | |
itemArray | |
|> Array.mapi (fun i item -> Job { Id = i; WorkItem = item }) | |
|> Array.iter (block.Add) | |
[1..limit] | |
|> Seq.iter (fun x -> block.Add(End)) | |
completeBlock.Take() | |
let results = Array.zeroCreate itemCount | |
resultMap | |
|> Seq.iter (fun kv -> results.[kv.Key] <- kv.Value) | |
results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
First of all, consider what happens when
f
raises an exception.Note that within an
async
is equivalent to
The same goes for
which is equivalent to
In other words, wrapping non-async code with
async
(unfortunately) does not make it behave asynchronously.