Skip to content

Instantly share code, notes, and snippets.

@toburger
Last active December 20, 2015 01:59
Show Gist options
  • Save toburger/6053133 to your computer and use it in GitHub Desktop.
Save toburger/6053133 to your computer and use it in GitHub Desktop.
(* see: http://stackoverflow.com/questions/6219726/throttled-async-download-in-f *)
open System
type ThrottlingAgentMessage<'a> =
| AddJob of Async<'a> * AsyncReplyChannel<'a>
| CompletedJob of 'a * AsyncReplyChannel<'a>
| Stop
type ThrottlingAgent<'a>(limit) =
let agent = MailboxProcessor<ThrottlingAgentMessage<'a>>.Start(fun agent ->
let rec loop jobs count = async {
let! msg = agent.Receive()
match msg with
| AddJob(job, reply) ->
if count < limit then
return! work ((job, reply)::jobs) count
else
return! loop ((job, reply)::jobs) count
| CompletedJob(result, reply) ->
reply.Reply result
return! work jobs (count-1)
| Stop -> return ()
}
and work jobs count = async {
match jobs with
| [] -> return! loop jobs count
| (job, reply)::jobs ->
async {
let! result = job
agent.Post(CompletedJob(result, reply))
} |> Async.Start
return! loop jobs (count + 1)
}
loop [] 0
)
member __.AddJob(job) = agent.PostAndAsyncReply(fun rep -> AddJob(job, rep))
member __.Stop() = agent.Post(Stop)
static member RunJobs limit jobs =
let agent = ThrottlingAgent<'a>(limit)
let res =
jobs
|> Seq.map agent.AddJob
|> Async.Parallel
|> Async.RunSynchronously
agent.Stop()
res
@toburger
Copy link
Author

(* EXAMPLE *)

let rnd = 
    let random = new Random()
    fun () -> random.Next(500, 2000)

(* run this, when you don't know in advance how many async computations are necessary
   for example when you crawl a web page *)
let ta = new ThrottlingAgent<_>(20)
[0..200]
|> List.mapi (fun i _ ->
    async {
        printfn "downloading %i..." i
        do! Async.Sleep(rnd())
        printfn "finished %i" i
        return sprintf  "this is the result %i!" i
    } |> ta.AddJob)
|> Async.Parallel
|> Async.RunSynchronously

(* run this when you know in advance how many async computations are provided *)
[0..200]
|> List.mapi (fun i _ ->
     async {
        printfn "downloading %i..." i
        do! Async.Sleep(rnd())
        printfn "finished %i" i
        return sprintf  "this is the result %i!" i
    }
) |> ThrottlingAgent.RunJobs 20

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment