Last active
          August 29, 2015 14:21 
        
      - 
      
- 
        Save hodzanassredin/21cedfa2815735f8880e to your computer and use it in GitHub Desktop. 
    hopac alternatives on top of async
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | module Promise = | |
| open System.Threading.Tasks | |
| type Promise<'a> = {signal: 'a -> bool; | |
| future : Async<'a>; | |
| cancel : unit -> bool} | |
| let create<'a> () = | |
| let tcs = new TaskCompletionSource<'a>() | |
| let ta: Async<'a> = Async.AwaitTask tcs.Task | |
| {signal = tcs.TrySetResult; | |
| future = ta; | |
| cancel = tcs.TrySetCanceled} | |
| let never<'a> () = {create<'a> () with signal = (fun _ -> false)} | |
| let wrapWrkfl wrkfl = | |
| let res = create() | |
| async{ | |
| let! r = wrkfl | |
| res.signal(r) |> ignore | |
| }, res.future | |
| module ALternatives = | |
| open System | |
| open Promise | |
| type Transaction<'a> = {commit:'a -> Async<bool>} | |
| type Alt<'a> = Alt of (Transaction<'a> -> Async<unit>) | |
| let asyncReturn x = async{return x} | |
| let run (tran : Transaction<'a>) = function | Alt(alt) -> alt(tran) | |
| let fromAsync wrkfl = | |
| Alt(fun tran -> | |
| async{ | |
| let! res = wrkfl | |
| let! _ = tran.commit res | |
| return () | |
| } | |
| ) | |
| let asyncAliasing = Async.StartChild | |
| let choose (one, two) = | |
| Alt(fun tran -> | |
| async{ | |
| let commitOnce = Promise.create<unit>() | |
| let subCommit x = async{ | |
| if commitOnce.signal() | |
| then return! tran.commit x | |
| else return false} | |
| run {commit = subCommit} one |> Async.Start | |
| run {commit = subCommit} two |> Async.Start | |
| } | |
| ) | |
| let merge (one:Alt<'a>, two:Alt<'b>) = | |
| Alt(fun tran -> | |
| async{ | |
| let commit1 = Promise.create() | |
| let commit2 = Promise.create() | |
| let bothOk = Promise.create<bool>() | |
| let commit subCommit v = | |
| async{ | |
| subCommit.signal v |> ignore | |
| return! bothOk.future | |
| } | |
| let tran1 = {commit = commit commit1} | |
| let tran2 = {commit = commit commit2} | |
| run tran1 one |> Async.Start | |
| run tran2 two |> Async.Start | |
| let! res1 = commit1.future | |
| let! res2 = commit2.future | |
| let! isCommited = tran.commit(res1, res2) | |
| bothOk.signal(isCommited) |> ignore | |
| } | |
| ) | |
| let bind (one:Alt<'a>, f:'a -> Alt<'b>) = | |
| Alt(fun tran -> | |
| async{ | |
| let commit v = | |
| async{ | |
| let sub = f v | |
| let subCommited = Promise.create<bool>() | |
| let commit v = async{ | |
| let! succ = tran.commit(v) | |
| subCommited.signal(succ) |> ignore | |
| return succ | |
| } | |
| run {commit = commit} sub |> Async.Start | |
| return! subCommited.future | |
| } | |
| run {commit = commit} one |> Async.Start | |
| } | |
| ) | |
| let commit<'a,'b> (alt:Alt<'a>) : Alt<'a> = | |
| Alt(fun (tran:Transaction<'a>) -> | |
| async{ | |
| let nack = Promise.create<unit>() | |
| let commit v = async{ | |
| let! _ = tran.commit(v) | |
| return true | |
| } | |
| let tran = {commit = commit} | |
| do! run tran alt | |
| } | |
| ) | |
| let detach<'a,'b> (alt:Alt<'a>) : Alt<'b> = | |
| Alt(fun (tran:Transaction<'b>) -> | |
| async{ | |
| let nack = Promise.create<unit>() | |
| let commit (v:'a) = async{ | |
| return true | |
| } | |
| let tran = {commit = commit} | |
| do! run tran alt | |
| } | |
| ) | |
| let always v = v |> asyncReturn |> fromAsync | |
| let map (alt,f) = bind(alt, fun x -> always(f(x))) | |
| let withNack (builder:Alt<'a> -> Async<Alt<'a>>) = | |
| Alt(fun tran -> | |
| async{ | |
| let nack = Promise.create<'a>() | |
| let commit v = async{ | |
| let! commited = tran.commit v | |
| if commited | |
| then nack.cancel() |> ignore | |
| else nack.signal(v) |> ignore | |
| return commited | |
| } | |
| let tran = {commit = commit} | |
| let! alt = builder(fromAsync(nack.future)) | |
| do! run tran alt | |
| } | |
| ) | |
| let wrap (alt,f) = | |
| Alt(fun tran -> | |
| async{ | |
| let commit v = async{ | |
| let! commited = tran.commit v | |
| if commited then f(v) | |
| return commited | |
| } | |
| do! run {commit = commit} alt | |
| } | |
| ) | |
| let guard g = withNack <| fun _ -> g | |
| let delay f = guard( async{ return! f()}) | |
| let ife (pred,thenAlt, elseAlt) = | |
| bind(pred, fun x -> | |
| if x then thenAlt | |
| else elseAlt) | |
| let rec loop<'a> (start:Alt<'a>, pred: 'a -> Alt<bool>, body: 'a -> Alt<'a>) = | |
| bind(start, fun x -> | |
| ife(pred x, loop(body(x), pred, body), always x)) | |
| let none() = always None | |
| let some alt = bind(alt, fun x -> always <| Some(x)) | |
| let never () = Alt(fun _ -> async{return ()}) | |
| let where (alt,f) = bind(alt, fun x -> | |
| if f(x) then always(x) | |
| else never()) | |
| let after ms v = async{ | |
| do! Async.Sleep(ms) | |
| return v} |> fromAsync | |
| let unit () = always () | |
| let zero () = never () | |
| let chooseXs xs = Seq.fold (fun x y -> choose (x,y)) (never()) xs | |
| let mergeXs (xs:Alt<'a> seq) : Alt<'a seq> = Seq.fold (fun (x:Alt<'a seq>) (y:Alt<'a>) -> map(merge (x,y), | |
| fun (x,y) -> seq{yield y | |
| yield! x})) (always(Seq.empty)) xs | |
| let pick alt = | |
| let res = Promise.create() | |
| let tran = {commit = fun v -> res.signal(v) |> asyncReturn} | |
| run tran alt |> Async.Start | |
| res.future | |
| let rec server alt = async{ | |
| let! _ = pick alt | |
| return! server alt | |
| } | |
| let private rand = new System.Random(DateTime.Now.Millisecond) | |
| let shuffle s = Seq.sortBy(fun _ -> rand.Next()) s | |
| open ALternatives | |
| open System | |
| open System.Collections.Generic | |
| open System.Threading | |
| let runSync alt = alt |> pick |> Async.RunSynchronously | |
| let test x = x |> Async.RunSynchronously |> printfn "result is %A" | |
| let testAsync x = async{let! x = x | |
| printfn "resut is %A" x} |> Async.Start | |
| type TransactionBuilder() = | |
| member this.Bind(m, f) = bind(m,f) | |
| member this.Return(x) = always x | |
| let tranB = TransactionBuilder() | |
| type ChooseBuilder() = | |
| [<CustomOperation("case")>] | |
| member this.Case(x,y) = ALternatives.choose(y,x) | |
| member this.Yield(()) = never() | |
| let chooseB = ChooseBuilder() | |
| type MergeBuilder() = | |
| [<CustomOperation("case")>] | |
| member this.Case(x,y) = ALternatives.merge(y,x) | |
| member this.Yield(()) = never() | |
| let mergeB = MergeBuilder() | |
| type AltQueryBuilder() = | |
| member t.Zip(xs,ys) = ALternatives.merge(xs,ys) | |
| member t.For(xs,f) = bind(xs,f) | |
| member t.For((x,y),f) = bind(ALternatives.merge(x,y),f) | |
| member t.For((x,y,z),f) = let tmp1 = ALternatives.merge(x,y) | |
| let tmp2 = map(ALternatives.merge(tmp1,z), fun ((x,y),z) -> x,y,z) | |
| bind(tmp2,f) | |
| member t.For(x,f) = bind(ALternatives.mergeXs(x),f) | |
| member t.Yield(x) = always(x) | |
| member t.Zero() = never() | |
| [<CustomOperation("where", MaintainsVariableSpace=true)>] | |
| member x.Where | |
| ( source:Alt<'T>, | |
| [<ProjectionParameter>] f:'T -> bool ) : Alt<'T> = where(source,f) | |
| [<CustomOperation("select")>] | |
| member x.Select | |
| ( source:Alt<'T>, | |
| [<ProjectionParameter>] f:'T -> 'R) : Alt<'R> = map(source,f) | |
| let queryB = new AltQueryBuilder() | |
| queryB{ | |
| for x,y in (always(1),always(1))do | |
| where (x = 1) | |
| select (x + y) | |
| } |> pick |> test | |
| let add_nack alt = withNack (fun nack -> | |
| let nack = map(nack, fun x -> printfn "nacked %A" x) | |
| choose(alt, detach(nack)) |> asyncReturn) | |
| let nack ()= withNack (fun nack -> async{ return choose(commit nack, always()) }) | |
| nack() |> pick |>test | |
| [unit(); nack()] |> chooseXs |> pick |>test | |
| always(2) |> pick |> test | |
| always(2) |> add_nack |> pick |> test | |
| [always(1); always(2)] |> chooseXs |> pick |> test | |
| [always(1); always(2)] |> shuffle |> chooseXs |> pick |> test | |
| [after 300 "300 wins";after 200 "200 wins"] |> chooseXs |> pick |> test | |
| [after 300 300 |> add_nack; | |
| after 200 200 |> add_nack] |> chooseXs |> pick |> test | |
| ((after 200 200 |> add_nack, | |
| after 300 300 |> add_nack) |> choose, | |
| (after 400 200 |> add_nack, | |
| after 500 300 |> add_nack) |> choose) |> choose |> pick |> test | |
| choose(fromAsync(async{do! Async.Sleep(1000) | |
| do! Async.Sleep(1000) | |
| return "async wins"}), | |
| after 3000 "after wins") |> pick |> test | |
| type internal BlockingAgentMessage<'T> = | |
| | Add of 'T * Transaction<unit> | |
| | Get of Transaction<'T> | |
| and | |
| Ch<'T>(maxLength) = | |
| let maxLength = if maxLength <= 0 | |
| then Int32.MaxValue | |
| else maxLength | |
| [<VolatileField>] | |
| let mutable count = 0 | |
| let agent = MailboxProcessor.Start(fun agent -> | |
| let queue = new Queue<_>() | |
| let pending = new Queue<_>() | |
| let rec emptyQueue() = | |
| agent.Scan(fun msg -> | |
| match msg with | |
| | Add(value, ctx) -> Some <| async { | |
| let! commited = ctx.commit() | |
| if commited then | |
| queue.Enqueue(value) | |
| count <- queue.Count | |
| return! nonEmptyQueue() | |
| else return! emptyQueue() } | |
| | _ -> None ) | |
| and nonEmptyQueue() = async { | |
| let! msg = agent.Receive() | |
| match msg with | |
| | Add(value, ctx) -> | |
| if queue.Count < maxLength then | |
| let! commited = ctx.commit() | |
| if commited then | |
| queue.Enqueue(value) | |
| count <- queue.Count | |
| else | |
| pending.Enqueue(value, ctx) | |
| return! nonEmptyQueue() | |
| | Get(ctx) -> | |
| let item = queue.Dequeue() | |
| let! commited = ctx.commit(item) | |
| if commited then | |
| while queue.Count < maxLength && pending.Count > 0 do | |
| let itm, ctx = pending.Dequeue() | |
| let! commited = ctx.commit() | |
| if commited then | |
| queue.Enqueue(itm) | |
| count <- queue.Count | |
| if queue.Count = 0 then return! emptyQueue() | |
| else return! nonEmptyQueue() | |
| else queue.Enqueue(item) | |
| return! nonEmptyQueue()} | |
| emptyQueue() ) | |
| member x.Count = count | |
| member x.AltAdd(v:'T) = | |
| Alt(fun tran -> async{ | |
| agent.Post(Add(v, tran)) | |
| }) | |
| member x.AltGetUnsafe() = | |
| Alt(fun tran -> async{ | |
| agent.Post(Get(tran)) | |
| }) | |
| member x.AltGet() = | |
| withNack(fun nack ->async{ | |
| return choose(commit(x.AltGetUnsafe()), | |
| detach(bind(nack, fun v -> x.AltAdd(v)))) | |
| }) | |
| let ch = Ch<int>(1) | |
| let toAlways a alt = bind(alt,fun _ -> always(a)) | |
| let wrapPrint alt = wrap(alt,fun x -> printfn "wrap %A" x) | |
| ch.AltAdd(1) |> wrapPrint |> pick |> test | |
| ch.AltGet() |> pick |> test | |
| ch.AltAdd(0) |> wrapPrint |> pick |> test | |
| ch.Count | |
| merge(always(1), always(2))|> pick |> test | |
| [ch.AltAdd(1)|> toAlways -1; ch.AltGet()] |> chooseXs |> pick |> test | |
| //joinads samples | |
| let putString = Ch<string>(0) | |
| let putInt = Ch<int>(0) | |
| let echo = Ch<string>(0) | |
| server (choose ( bind(putString.AltGet(), fun v -> echo.AltAdd(sprintf "Echo %s" v)), | |
| bind(putInt.AltGet(), fun v -> echo.AltAdd(sprintf "Echo %d" v)))) |> Async.Start | |
| wrap(echo.AltGet(), fun s -> printfn "GOT: %s" s) |> server |> Async.Start | |
| // Put 5 values to 'putString' and 5 values to 'putInt' | |
| async { | |
| for i in 1 .. 5 do | |
| do! putString.AltAdd("Hello!") |> pick | |
| do! putInt.AltAdd(i) |> pick | |
| } |> Async.Start | |
| //async cancellation | |
| let asyncWitchCancellation wrkfl = | |
| withNack(fun nack -> async{ | |
| let cts = new CancellationTokenSource() | |
| let wrkfl, res = Promise.wrapWrkfl(wrkfl) | |
| Async.Start(wrkfl, cts.Token) | |
| return choose(fromAsync res, detach(map(nack, fun _ -> printfn "async cancelled" | |
| cts.Cancel()))) | |
| }) | |
| let wrkfl = async{ | |
| do! Async.Sleep(1000) | |
| return "async finished" | |
| } | |
| (asyncWitchCancellation wrkfl, always "always finished") |> choose |> pick |> test | |
| (asyncWitchCancellation wrkfl, never()) |> choose |> pick |> test | |
| //fetcher | |
| open Microsoft.FSharp.Control.WebExtensions | |
| open System.Net | |
| let fetchAsync (name, url:string) = async { | |
| let uri = new System.Uri(url) | |
| let webClient = new WebClient() | |
| let! html = webClient.AsyncDownloadString(uri) | |
| return sprintf "Read %d characters for %s" html.Length name | |
| } | |
| let fetchAlt (name, url) : Alt<string> = | |
| fetchAsync (name, url) |> asyncWitchCancellation | |
| let urlList = [ "Microsoft.com", "http://www.microsoft.com/" | |
| "MSDN", "http://msdn.microsoft.com/" | |
| "Bing", "http://www.bing.com" ] | |
| let runFastest () = | |
| urlList | |
| |> Seq.map fetchAlt | |
| |> chooseXs | |
| |> pick | |
| |> test | |
| let runAll () = | |
| urlList | |
| |> Seq.map fetchAlt | |
| |> mergeXs | |
| |> pick | |
| |> test | |
| runFastest() | |
| runAll() | |
| //one place buffer | |
| let put, get, empty, contains = Ch<string>(0), Ch<string>(0), Ch<unit>(0), Ch<string>(0) | |
| // Initially, the buffer is empty | |
| empty.AltAdd() |> pick |> Async.RunSynchronously | |
| chooseB{ | |
| case (tranB{ | |
| do! empty.AltGet() | |
| let! x = put.AltGet() | |
| do! contains.AltAdd(x) | |
| }) | |
| case (tranB{ | |
| let! v = contains.AltGet() | |
| let! x = get.AltAdd(v) | |
| do! empty.AltAdd() | |
| })} |> server |> Async.Start | |
| // Repeatedly try to put value into the buffer | |
| async { do! Async.Sleep 1000 | |
| for i in 0 .. 10 do | |
| printfn "putting: %d" i | |
| do! pick (put.AltAdd(string i)) | |
| do! Async.Sleep 500 } | |
| |> Async.Start | |
| // Repeatedly read values from the buffer and print them | |
| async { while true do | |
| do! Async.Sleep 250 | |
| let! v = get.AltGet() |> pick | |
| printfn "got: %s" v } | |
| |> Async.Start | |
| // Dinning philosophers | |
| let n = 5 | |
| let chopsticks = [| for i = 1 to n do yield new Ch<unit>(0) |] | |
| let hungry = [| for i = 1 to n do yield new Ch<unit>(0) |] | |
| let philosophers = [| "Plato"; "Konfuzius"; "Socrates"; "Voltaire"; "Descartes" |] | |
| let randomDelay (r : Random) = System.Threading.Thread.Sleep(r.Next(1, 10) * 1000) | |
| for i = 0 to n - 1 do | |
| let left = chopsticks.[i] | |
| let right = chopsticks.[(i+1) % n] | |
| let random = new Random() | |
| queryB{ | |
| for x,y,z in (hungry.[i].AltGet(), left.AltGet(), right.AltGet()) do | |
| select( | |
| printfn "%s is eating" philosophers.[i] | |
| randomDelay random | |
| left.AltAdd() |> pick |> Async.Start | |
| right.AltAdd() |> pick |> Async.Start | |
| printfn "%s is thinking" philosophers.[i] | |
| ()) | |
| } |> pick |> Async.Start | |
| // Run | |
| for chopstick in chopsticks do | |
| chopstick.AltAdd() |> pick |> Async.Start | |
| let random = new Random() | |
| while true do | |
| hungry.[random.Next(0, n)].AltAdd() |> pick |> Async.Start | |
| randomDelay random | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
            
Note that alternatives in Hopac (or events in CML) do not form a monad. Consult the Monad laws. Also see Transactional Events.