Created
November 24, 2014 17:07
-
-
Save devboy/96f5d029a15d8db38faa to your computer and use it in GitHub Desktop.
Some Throttle Agent in F#
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace FSharp.Async | |
open System | |
open System.Collections.Generic | |
open System.Collections.Concurrent | |
type Agent<'T> = MailboxProcessor<'T> | |
[<ReferenceEquality>] | |
type Token = | Token of unit with | |
interface IComparable with | |
member x.CompareTo(y) = | |
if Object.ReferenceEquals(x,y) | |
then 0 else -1 | |
type private ThrottleAgentMessage<'T> = | |
| Push of Token * 'T * AsyncReplyChannel<unit> | |
| Pull of Token * AsyncReplyChannel<'T> | |
| Free of Token * AsyncReplyChannel<unit> | |
member x.Token = | |
match x with | Push(t,_,_) | Pull(t,_) | Free(t,_) -> t | |
type ThrottleAgent<'T> (capacity:int) = | |
let locked = new Dictionary<Token,'T>(capacity) | |
let unlocked = new Dictionary<Token,'T>() | |
let agent = Agent.Start(fun agent -> async { | |
let update () = async { | |
let agentAction (f:unit->unit) = Some(async{ f() }) | |
let hasCapacity ()= (locked.Count < capacity) | |
let isLocked = locked.ContainsKey | |
let isUnlocked = unlocked.ContainsKey | |
let tryEnqueue msg = | |
match msg with | |
| Free(t, ch) when isLocked t && not <| isUnlocked t -> | |
agentAction (fun ()-> let v = locked.[t] | |
locked.Remove(t) |> ignore | |
unlocked.Add(msg.Token,v) | |
ch.Reply()) | |
| Pull(t, ch) when not <| isLocked t && isUnlocked t -> | |
agentAction (fun ()-> let v = unlocked.[t] | |
unlocked.Remove(t) |> ignore | |
ch.Reply(v)) | |
| Push(t, v, ch) when not <| isLocked t && hasCapacity() -> | |
agentAction (fun ()-> locked.Add(msg.Token,v) |> ignore | |
ch.Reply()) | |
| _ -> None | |
while true do do! agent.Scan tryEnqueue | |
} | |
return! update() }) | |
member x.PushAsync (v:'T) (t:Token) = | |
agent.PostAndAsyncReply(fun ch -> Push(t, v, ch)) | |
member x.PullAsync (t:Token) = | |
agent.PostAndAsyncReply(fun ch -> Pull(t, ch)) | |
member x.FreeAsync (t:Token) = | |
agent.PostAndAsyncReply(fun ch -> Free(t, ch)) | |
type AsyncThrottleAgent<'T> (capacity:int) = | |
let results = new ConcurrentDictionary<Token,'T>() | |
let throttle = new ThrottleAgent<Async<'T>>(capacity); | |
let agent = Agent.Start(fun agent -> async { | |
let update ()= async { | |
let agentAction (f:unit->unit) = Some(async{ f() }) | |
let tryEnqueue msg = | |
match msg with | |
| Push(t, v, ch) when not <| results.ContainsKey t -> | |
agentAction(fun ()-> results.TryAdd(t, v) |> ignore | |
ch.Reply() ) | |
| Pull(t,ch) when results.ContainsKey t -> | |
agentAction(fun ()-> let v = results.[t] | |
results.TryRemove t |> ignore | |
ch.Reply(v)) | |
| Free(t,ch) -> agentAction(fun ()-> ch.Reply()) | |
| _ -> None | |
while true do do! agent.Scan tryEnqueue | |
} | |
return! update()}) | |
member x.PushAsync (v:Async<'T>) (t:Token) = async { | |
do! throttle.PushAsync v t | |
let! d = v | |
do! throttle.FreeAsync t | |
do! agent.PostAndAsyncReply(fun ch -> Push(t, d, ch)) | |
do throttle.PullAsync t |> ignore | |
} | |
member x.PullAsync (t:Token) = | |
agent.PostAndAsyncReply(fun ch -> Pull(t, ch)) | |
member x.PerformAsync (v:Async<'T>) (t:Token) = async { | |
do! x.PushAsync v t | |
return! x.PullAsync t | |
} | |
//Example | |
module Examples = | |
let throttle = AsyncThrottleAgent<string>(15) | |
seq { for x in 0 .. 100 do yield async { | |
try return! throttle.PerformAsync <| FSharp.Data.Http.AsyncRequestString("http://www.github.com", silentHttpErrors=true) <| Token() | |
with ex -> return ex.Message | |
} | |
} | |
|> Async.Parallel | |
|> Async.RunSynchronously | |
|> printfn "results: %A" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment