Skip to content

Instantly share code, notes, and snippets.

@hodzanassredin
Last active August 29, 2015 14:21
Show Gist options
  • Save hodzanassredin/21cedfa2815735f8880e to your computer and use it in GitHub Desktop.
Save hodzanassredin/21cedfa2815735f8880e to your computer and use it in GitHub Desktop.
hopac alternatives on top of async
module Promise =
open System.Threading.Tasks
type Promise<'a> = {signal: 'a -> bool;
future : Async<'a>;
cancel : unit -> bool}
let create<'a> () =
let tcs = new TaskCompletionSource<'a>()
let ta: Async<'a> = Async.AwaitTask tcs.Task
{signal = tcs.TrySetResult;
future = ta;
cancel = tcs.TrySetCanceled}
let never<'a> () = {create<'a> () with signal = (fun _ -> false)}
let wrapWrkfl wrkfl =
let res = create()
async{
let! r = wrkfl
res.signal(r) |> ignore
}, res.future
module ALternatives =
open System
open Promise
type Transaction<'a> = {commit:'a -> Async<bool>}
type Alt<'a> = Alt of (Transaction<'a> -> Async<unit>)
let asyncReturn x = async{return x}
let run (tran : Transaction<'a>) = function | Alt(alt) -> alt(tran)
let fromAsync wrkfl =
Alt(fun tran ->
async{
let! res = wrkfl
let! _ = tran.commit res
return ()
}
)
let asyncAliasing = Async.StartChild
let choose (one, two) =
Alt(fun tran ->
async{
let commitOnce = Promise.create<unit>()
let subCommit x = async{
if commitOnce.signal()
then return! tran.commit x
else return false}
run {commit = subCommit} one |> Async.Start
run {commit = subCommit} two |> Async.Start
}
)
let merge (one:Alt<'a>, two:Alt<'b>) =
Alt(fun tran ->
async{
let commit1 = Promise.create()
let commit2 = Promise.create()
let bothOk = Promise.create<bool>()
let commit subCommit v =
async{
subCommit.signal v |> ignore
return! bothOk.future
}
let tran1 = {commit = commit commit1}
let tran2 = {commit = commit commit2}
run tran1 one |> Async.Start
run tran2 two |> Async.Start
let! res1 = commit1.future
let! res2 = commit2.future
let! isCommited = tran.commit(res1, res2)
bothOk.signal(isCommited) |> ignore
}
)
let bind (one:Alt<'a>, f:'a -> Alt<'b>) =
Alt(fun tran ->
async{
let commit v =
async{
let sub = f v
let subCommited = Promise.create<bool>()
let commit v = async{
let! succ = tran.commit(v)
subCommited.signal(succ) |> ignore
return succ
}
run {commit = commit} sub |> Async.Start
return! subCommited.future
}
run {commit = commit} one |> Async.Start
}
)
let commit<'a,'b> (alt:Alt<'a>) : Alt<'a> =
Alt(fun (tran:Transaction<'a>) ->
async{
let nack = Promise.create<unit>()
let commit v = async{
let! _ = tran.commit(v)
return true
}
let tran = {commit = commit}
do! run tran alt
}
)
let detach<'a,'b> (alt:Alt<'a>) : Alt<'b> =
Alt(fun (tran:Transaction<'b>) ->
async{
let nack = Promise.create<unit>()
let commit (v:'a) = async{
return true
}
let tran = {commit = commit}
do! run tran alt
}
)
let always v = v |> asyncReturn |> fromAsync
let map (alt,f) = bind(alt, fun x -> always(f(x)))
let withNack (builder:Alt<'a> -> Async<Alt<'a>>) =
Alt(fun tran ->
async{
let nack = Promise.create<'a>()
let commit v = async{
let! commited = tran.commit v
if commited
then nack.cancel() |> ignore
else nack.signal(v) |> ignore
return commited
}
let tran = {commit = commit}
let! alt = builder(fromAsync(nack.future))
do! run tran alt
}
)
let wrap (alt,f) =
Alt(fun tran ->
async{
let commit v = async{
let! commited = tran.commit v
if commited then f(v)
return commited
}
do! run {commit = commit} alt
}
)
let guard g = withNack <| fun _ -> g
let delay f = guard( async{ return! f()})
let ife (pred,thenAlt, elseAlt) =
bind(pred, fun x ->
if x then thenAlt
else elseAlt)
let rec loop<'a> (start:Alt<'a>, pred: 'a -> Alt<bool>, body: 'a -> Alt<'a>) =
bind(start, fun x ->
ife(pred x, loop(body(x), pred, body), always x))
let none() = always None
let some alt = bind(alt, fun x -> always <| Some(x))
let never () = Alt(fun _ -> async{return ()})
let where (alt,f) = bind(alt, fun x ->
if f(x) then always(x)
else never())
let after ms v = async{
do! Async.Sleep(ms)
return v} |> fromAsync
let unit () = always ()
let zero () = never ()
let chooseXs xs = Seq.fold (fun x y -> choose (x,y)) (never()) xs
let mergeXs (xs:Alt<'a> seq) : Alt<'a seq> = Seq.fold (fun (x:Alt<'a seq>) (y:Alt<'a>) -> map(merge (x,y),
fun (x,y) -> seq{yield y
yield! x})) (always(Seq.empty)) xs
let pick alt =
let res = Promise.create()
let tran = {commit = fun v -> res.signal(v) |> asyncReturn}
run tran alt |> Async.Start
res.future
let rec server alt = async{
let! _ = pick alt
return! server alt
}
let private rand = new System.Random(DateTime.Now.Millisecond)
let shuffle s = Seq.sortBy(fun _ -> rand.Next()) s
open ALternatives
open System
open System.Collections.Generic
open System.Threading
let runSync alt = alt |> pick |> Async.RunSynchronously
let test x = x |> Async.RunSynchronously |> printfn "result is %A"
let testAsync x = async{let! x = x
printfn "resut is %A" x} |> Async.Start
type TransactionBuilder() =
member this.Bind(m, f) = bind(m,f)
member this.Return(x) = always x
let tranB = TransactionBuilder()
type ChooseBuilder() =
[<CustomOperation("case")>]
member this.Case(x,y) = ALternatives.choose(y,x)
member this.Yield(()) = never()
let chooseB = ChooseBuilder()
type MergeBuilder() =
[<CustomOperation("case")>]
member this.Case(x,y) = ALternatives.merge(y,x)
member this.Yield(()) = never()
let mergeB = MergeBuilder()
type AltQueryBuilder() =
member t.Zip(xs,ys) = ALternatives.merge(xs,ys)
member t.For(xs,f) = bind(xs,f)
member t.For((x,y),f) = bind(ALternatives.merge(x,y),f)
member t.For((x,y,z),f) = let tmp1 = ALternatives.merge(x,y)
let tmp2 = map(ALternatives.merge(tmp1,z), fun ((x,y),z) -> x,y,z)
bind(tmp2,f)
member t.For(x,f) = bind(ALternatives.mergeXs(x),f)
member t.Yield(x) = always(x)
member t.Zero() = never()
[<CustomOperation("where", MaintainsVariableSpace=true)>]
member x.Where
( source:Alt<'T>,
[<ProjectionParameter>] f:'T -> bool ) : Alt<'T> = where(source,f)
[<CustomOperation("select")>]
member x.Select
( source:Alt<'T>,
[<ProjectionParameter>] f:'T -> 'R) : Alt<'R> = map(source,f)
let queryB = new AltQueryBuilder()
queryB{
for x,y in (always(1),always(1))do
where (x = 1)
select (x + y)
} |> pick |> test
let add_nack alt = withNack (fun nack ->
let nack = map(nack, fun x -> printfn "nacked %A" x)
choose(alt, detach(nack)) |> asyncReturn)
let nack ()= withNack (fun nack -> async{ return choose(commit nack, always()) })
nack() |> pick |>test
[unit(); nack()] |> chooseXs |> pick |>test
always(2) |> pick |> test
always(2) |> add_nack |> pick |> test
[always(1); always(2)] |> chooseXs |> pick |> test
[always(1); always(2)] |> shuffle |> chooseXs |> pick |> test
[after 300 "300 wins";after 200 "200 wins"] |> chooseXs |> pick |> test
[after 300 300 |> add_nack;
after 200 200 |> add_nack] |> chooseXs |> pick |> test
((after 200 200 |> add_nack,
after 300 300 |> add_nack) |> choose,
(after 400 200 |> add_nack,
after 500 300 |> add_nack) |> choose) |> choose |> pick |> test
choose(fromAsync(async{do! Async.Sleep(1000)
do! Async.Sleep(1000)
return "async wins"}),
after 3000 "after wins") |> pick |> test
type internal BlockingAgentMessage<'T> =
| Add of 'T * Transaction<unit>
| Get of Transaction<'T>
and
Ch<'T>(maxLength) =
let maxLength = if maxLength <= 0
then Int32.MaxValue
else maxLength
[<VolatileField>]
let mutable count = 0
let agent = MailboxProcessor.Start(fun agent ->
let queue = new Queue<_>()
let pending = new Queue<_>()
let rec emptyQueue() =
agent.Scan(fun msg ->
match msg with
| Add(value, ctx) -> Some <| async {
let! commited = ctx.commit()
if commited then
queue.Enqueue(value)
count <- queue.Count
return! nonEmptyQueue()
else return! emptyQueue() }
| _ -> None )
and nonEmptyQueue() = async {
let! msg = agent.Receive()
match msg with
| Add(value, ctx) ->
if queue.Count < maxLength then
let! commited = ctx.commit()
if commited then
queue.Enqueue(value)
count <- queue.Count
else
pending.Enqueue(value, ctx)
return! nonEmptyQueue()
| Get(ctx) ->
let item = queue.Dequeue()
let! commited = ctx.commit(item)
if commited then
while queue.Count < maxLength && pending.Count > 0 do
let itm, ctx = pending.Dequeue()
let! commited = ctx.commit()
if commited then
queue.Enqueue(itm)
count <- queue.Count
if queue.Count = 0 then return! emptyQueue()
else return! nonEmptyQueue()
else queue.Enqueue(item)
return! nonEmptyQueue()}
emptyQueue() )
member x.Count = count
member x.AltAdd(v:'T) =
Alt(fun tran -> async{
agent.Post(Add(v, tran))
})
member x.AltGetUnsafe() =
Alt(fun tran -> async{
agent.Post(Get(tran))
})
member x.AltGet() =
withNack(fun nack ->async{
return choose(commit(x.AltGetUnsafe()),
detach(bind(nack, fun v -> x.AltAdd(v))))
})
let ch = Ch<int>(1)
let toAlways a alt = bind(alt,fun _ -> always(a))
let wrapPrint alt = wrap(alt,fun x -> printfn "wrap %A" x)
ch.AltAdd(1) |> wrapPrint |> pick |> test
ch.AltGet() |> pick |> test
ch.AltAdd(0) |> wrapPrint |> pick |> test
ch.Count
merge(always(1), always(2))|> pick |> test
[ch.AltAdd(1)|> toAlways -1; ch.AltGet()] |> chooseXs |> pick |> test
//joinads samples
let putString = Ch<string>(0)
let putInt = Ch<int>(0)
let echo = Ch<string>(0)
server (choose ( bind(putString.AltGet(), fun v -> echo.AltAdd(sprintf "Echo %s" v)),
bind(putInt.AltGet(), fun v -> echo.AltAdd(sprintf "Echo %d" v)))) |> Async.Start
wrap(echo.AltGet(), fun s -> printfn "GOT: %s" s) |> server |> Async.Start
// Put 5 values to 'putString' and 5 values to 'putInt'
async {
for i in 1 .. 5 do
do! putString.AltAdd("Hello!") |> pick
do! putInt.AltAdd(i) |> pick
} |> Async.Start
//async cancellation
let asyncWitchCancellation wrkfl =
withNack(fun nack -> async{
let cts = new CancellationTokenSource()
let wrkfl, res = Promise.wrapWrkfl(wrkfl)
Async.Start(wrkfl, cts.Token)
return choose(fromAsync res, detach(map(nack, fun _ -> printfn "async cancelled"
cts.Cancel())))
})
let wrkfl = async{
do! Async.Sleep(1000)
return "async finished"
}
(asyncWitchCancellation wrkfl, always "always finished") |> choose |> pick |> test
(asyncWitchCancellation wrkfl, never()) |> choose |> pick |> test
//fetcher
open Microsoft.FSharp.Control.WebExtensions
open System.Net
let fetchAsync (name, url:string) = async {
let uri = new System.Uri(url)
let webClient = new WebClient()
let! html = webClient.AsyncDownloadString(uri)
return sprintf "Read %d characters for %s" html.Length name
}
let fetchAlt (name, url) : Alt<string> =
fetchAsync (name, url) |> asyncWitchCancellation
let urlList = [ "Microsoft.com", "http://www.microsoft.com/"
"MSDN", "http://msdn.microsoft.com/"
"Bing", "http://www.bing.com" ]
let runFastest () =
urlList
|> Seq.map fetchAlt
|> chooseXs
|> pick
|> test
let runAll () =
urlList
|> Seq.map fetchAlt
|> mergeXs
|> pick
|> test
runFastest()
runAll()
//one place buffer
let put, get, empty, contains = Ch<string>(0), Ch<string>(0), Ch<unit>(0), Ch<string>(0)
// Initially, the buffer is empty
empty.AltAdd() |> pick |> Async.RunSynchronously
chooseB{
case (tranB{
do! empty.AltGet()
let! x = put.AltGet()
do! contains.AltAdd(x)
})
case (tranB{
let! v = contains.AltGet()
let! x = get.AltAdd(v)
do! empty.AltAdd()
})} |> server |> Async.Start
// Repeatedly try to put value into the buffer
async { do! Async.Sleep 1000
for i in 0 .. 10 do
printfn "putting: %d" i
do! pick (put.AltAdd(string i))
do! Async.Sleep 500 }
|> Async.Start
// Repeatedly read values from the buffer and print them
async { while true do
do! Async.Sleep 250
let! v = get.AltGet() |> pick
printfn "got: %s" v }
|> Async.Start
// Dinning philosophers
let n = 5
let chopsticks = [| for i = 1 to n do yield new Ch<unit>(0) |]
let hungry = [| for i = 1 to n do yield new Ch<unit>(0) |]
let philosophers = [| "Plato"; "Konfuzius"; "Socrates"; "Voltaire"; "Descartes" |]
let randomDelay (r : Random) = System.Threading.Thread.Sleep(r.Next(1, 10) * 1000)
for i = 0 to n - 1 do
let left = chopsticks.[i]
let right = chopsticks.[(i+1) % n]
let random = new Random()
queryB{
for x,y,z in (hungry.[i].AltGet(), left.AltGet(), right.AltGet()) do
select(
printfn "%s is eating" philosophers.[i]
randomDelay random
left.AltAdd() |> pick |> Async.Start
right.AltAdd() |> pick |> Async.Start
printfn "%s is thinking" philosophers.[i]
())
} |> pick |> Async.Start
// Run
for chopstick in chopsticks do
chopstick.AltAdd() |> pick |> Async.Start
let random = new Random()
while true do
hungry.[random.Next(0, n)].AltAdd() |> pick |> Async.Start
randomDelay random
@polytypic
Copy link

Note that alternatives in Hopac (or events in CML) do not form a monad. Consult the Monad laws. Also see Transactional Events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment