Skip to content

Instantly share code, notes, and snippets.

@hodzanassredin
Last active August 29, 2015 14:21
Show Gist options
  • Save hodzanassredin/fe0e3a04e4b59e95d863 to your computer and use it in GitHub Desktop.
Save hodzanassredin/fe0e3a04e4b59e95d863 to your computer and use it in GitHub Desktop.
immutable alts
module Promise =
open System.Threading.Tasks
type Promise<'a> = {signal: 'a -> bool;
future : Async<'a>;
cancel : unit -> bool}
let create<'a> () =
let tcs = new TaskCompletionSource<'a>()
let ta: Async<'a> = Async.AwaitTask tcs.Task
{signal = tcs.TrySetResult;
future = ta;
cancel = tcs.TrySetCanceled}
let never<'a> () = {create<'a> () with signal = (fun _ -> false)}
let wrapWrkfl wrkfl =
let res = create()
async{
let! r = wrkfl
res.signal(r) |> ignore
}, res.future
open System.Threading
module Logger =
open System
let skip = [
"choose";
"merge";
"SingleStateKeeper";
"MailboxStateKeeper";
"AltGet";
"AltAdd"
]
let agent = MailboxProcessor<string * string>.Start(fun inbox ->
let rec loop n =
async {
let! who, msg = inbox.Receive();
if List.exists(fun x -> who.StartsWith(x)) skip then ()
else printfn "%s %s: %s" (DateTime.Now.ToString()) who msg
return! loop ()
}
loop ())
let log who msg = agent.Post(who,msg)
let logf who fmt msg = agent.Post(who,sprintf fmt msg)
type Lens<'r,'a> = {get:'r->'a; set:'r * 'a -> 'r}
module Lens =
let id() = {get = id;set = fun (r,v) -> v}
let idTyped<'s>() : Lens<'s,'s> = id()
let zip a b = { get = fun r -> (a.get(r),b.get(r));
set = fun (r,(av,bv)) -> b.set(a.set(r,av), bv)}
let merge (l1: Lens<'s,'r>) (l2: Lens<'r,'v>) =
{ get = l1.get >> l2.get
set = fun (s,v) -> let r = l1.get(s)
let r2 = l2.set(r,v)
let s2 = l1.set(s,r2)
s2}
module State =
open System.Collections.Concurrent
open System.Collections.Generic
open System
type IsMutatesState = bool
type StateResp<'a> = | Result of 'a
| Die
type OpResp<'a> = | NotBlocked of 'a
| Blocked
type StateOp<'s,'r> = 's -> OpResp<'s *'r>
type ProcessId = int
type StateKeeper<'s when 's : not struct> =
abstract member Apply<'r> : ProcessId * StateOp<'s,'r> -> Async<StateResp<'r>>
abstract member Merge : StateKeeper<'s> -> Async<StateResp<unit>>
abstract member Value : 's with get
abstract member InitValue : 's with get
abstract member IsNotChanged : bool with get
abstract member Stop : unit -> unit
abstract member MergeLens : Lens<'s,'r> -> StateKeeper<'r> -> Async<StateResp<unit>>
abstract member IsThreadSafe : bool with get
abstract member GetProcessId : IsMutatesState -> Async<ProcessId>
abstract member ReleaseProcessId : ProcessId -> unit
abstract member RunningProcsCountExcludeMe : ProcessId -> Async<int>
type SingleStateKeeper<'s when 's : not struct>(value : 's,name:string) =
let name = "SingleStateKeeper " + name + " " + Guid.NewGuid().ToString()
do Logger.log name "starting"
let refCell = ref value
interface StateKeeper<'s> with
member this.Apply (_,f) =
let res = f !refCell
match res with
| NotBlocked(s,r) -> refCell := s
async{return Result(r)}
| Blocked -> async{return Die}
member this.Merge keeper =
if obj.ReferenceEquals(keeper.InitValue,!refCell)
then refCell := keeper.Value
async{return Result()}
else async{return Die}
member this.Value with get() : 's = !refCell
member this.InitValue with get() : 's = value
member this.IsNotChanged with get() : bool = obj.ReferenceEquals(value, !refCell)
member this.Stop () = ()
member this.MergeLens lens keeper =
if obj.ReferenceEquals(keeper.InitValue, lens.get(!refCell))
then refCell := lens.set(!refCell, keeper.Value)
async{return Result()}
else async{return Die}
member this.IsThreadSafe with get() : bool = false
member this.GetProcessId mutator = Async.FromContinuations(fun (cont,_,_ )->cont(0))
member this.ReleaseProcessId _ = ()
member this.RunningProcsCountExcludeMe _ = Async.FromContinuations(fun (cont,_,_)-> cont(0))
//not finished obj reference problems
type MapStateKeeper<'s, 's2 when 's : not struct and 's2 : not struct>(state:StateKeeper<'s>, lens : Lens<'s,'s2>) =
let initValue = lens.get(state.InitValue)
interface StateKeeper<'s2> with
member this.Apply<'r> (procId,f:StateOp<'s2,'r>) : Async<StateResp<'r>>=
let fmap :StateOp<'s,'r> =
fun s -> let res = f(lens.get(s))
match res with
| NotBlocked(s2,r) -> NotBlocked(lens.set(s, s2),r)
| Blocked -> Blocked
state.Apply(procId,fmap)
member this.Merge keeper = state.MergeLens lens keeper
member this.Value with get() : 's2 = lens.get(state.Value)
member this.InitValue with get() : 's2 = initValue
member this.IsNotChanged with get() : bool = state.IsNotChanged
member this.Stop () = state.Stop()
member this.MergeLens lens2 keeper = state.MergeLens (Lens.merge lens lens2) keeper
member this.IsThreadSafe with get() : bool = state.IsThreadSafe
member this.GetProcessId mutator = state.GetProcessId mutator
member this.ReleaseProcessId procId = state.ReleaseProcessId procId
member this.RunningProcsCountExcludeMe procId = state.RunningProcsCountExcludeMe procId
type internal StateMessage<'s when 's : not struct> =
| Apply of (ProcessId * (StateResp<'s> -> OpResp<'s>))
| Merge of (unit-> bool) * (unit -> unit) * (StateResp<unit> -> unit)
| Stop
| GetProcessId of AsyncReplyChannel<ProcessId>
| ReleaseProcess of ProcessId
| RunningProcsCountExcludeMe of ProcessId * AsyncReplyChannel<int>
and MailboxStateKeeper<'s when 's : not struct>(value : 's,name:string) =
let name = "MailboxStateKeeper " + name + " " + Guid.NewGuid().ToString()
do Logger.log name "starting"
let procIdGen = ref 0
let runningProcs = ref Set.empty
let retryLimit = 1000
let idle = new Queue<ProcessId * _>()
let refCell = ref value
let rec apply (f,count) =
if count > retryLimit
then //Logger.log name "State: sending die"
f(Die) |> ignore
None
else match f(Result(!refCell)) with
| NotBlocked(state) ->
//Logger.log name "State: updating state"
refCell := state
None
| Blocked -> //Logger.log name "State: blocked on apply adding to a waiting list"
Some(f,count + 1)
let rec checkIdle () =
//Logger.log name "proccessing idle queue"
let mutable resolved = false
for i in 1..idle.Count do
let procId,onHold = idle.Dequeue()
match apply(onHold) with
| Some(bad) -> idle.Enqueue(procId,bad)
| _ -> //Logger.log name "State: not bloked item from idle queue"
resolved <- true
if resolved then checkIdle ()
let killIdle () =
//Logger.log name "State: killing idle procs"
while idle.Count > 0 do
let _,(f,_) = idle.Dequeue()
f(Die) |> ignore
let rec notBlockedRunningCount excludeProcId =
let mutable count = runningProcs.Value.Count
for (procId,_) in idle.ToArray() do
if runningProcs.Value.Contains(procId) then count <- count - 1
match excludeProcId with
| None -> ()
| Some(procId) -> if runningProcs.Value.Contains(procId) then count <- count - 1
count
let createProc (inbox:MailboxProcessor<StateMessage<'s>>) =
let rec loop () =
async { let! msg = inbox.Receive()
match msg with
| Apply(procId,f) ->
//Logger.log name "State: applying msg"
let bad = apply (f,0)
match bad with
| Some(bad) -> //Logger.log name "State: locked adding to idle"
idle.Enqueue(procId,bad)
if notBlockedRunningCount None = 0 then killIdle()
| None -> checkIdle ()
return! loop()
| Merge(precondition, doAct, reply) ->
if precondition() then doAct()
reply(Result())
checkIdle ()
else reply(Die)
return! loop()
| Stop -> killIdle()
//Logger.logf name "stopping with final state %A" !refCell
return ()
| GetProcessId(reply) -> Logger.logf name "GetProcessId %A" !procIdGen
reply.Reply(!procIdGen)
runningProcs := runningProcs.Value.Add(!procIdGen)
procIdGen := !procIdGen + 1
return! loop()
| ReleaseProcess(procId) -> runningProcs := runningProcs.Value.Remove(procId)
if notBlockedRunningCount None = 0 then killIdle()
return! loop()
| RunningProcsCountExcludeMe(procId,reply) -> reply.Reply(notBlockedRunningCount (Some(procId)))
return! loop()}
loop ()
let cts = new CancellationTokenSource()
let agent = MailboxProcessor.Start(createProc)
let fToMsg (f:StateOp<'s,'r>) (holder:Promise.Promise<StateResp<'r>>) =
function
|Result(state:'s) ->
let res = f(state)
match res with
| NotBlocked(state, res) -> holder.signal(Result(res)) |> ignore
NotBlocked(state)
| Blocked -> Blocked
| Die -> holder.signal(Die) |> ignore
Blocked
let stopped = new ManualResetEvent(false);
interface StateKeeper<'s> with
member this.Apply (procId,f) =
let holder = Promise.create<StateResp<'r>>()
if cts.Token.IsCancellationRequested then holder.signal(Die) |> ignore
else //Logger.log name "State: recieving apply f"
agent.Post(Apply(procId, fToMsg f holder))
holder.future
member this.Merge keeper =
//Logger.log name "State: recieving Merge"
let holder = Promise.create<StateResp<unit>>()
if cts.Token.IsCancellationRequested then holder.signal(Die) |> ignore
else let check () = obj.ReferenceEquals(keeper.InitValue,!refCell)
let doAct () = refCell := keeper.Value
agent.Post(Merge(check, doAct, fun x -> holder.signal(x) |> ignore))
holder.future
member this.Value with get() : 's = !refCell
member this.InitValue with get() : 's = value
member this.IsNotChanged with get() : bool = obj.ReferenceEquals(value, !refCell)
member this.Stop () =
//Logger.log name "State: recieving Stop"
cts.Cancel()
agent.Post(Stop)
member this.MergeLens lens keeper =
//Logger.log name "State: recieving MergeLens"
let holder = Promise.create<StateResp<unit>>()
if cts.Token.IsCancellationRequested then holder.signal(Die) |> ignore
else let check () = obj.ReferenceEquals(keeper.InitValue,lens.get(!refCell))
let doAct () = refCell := lens.set(!refCell, keeper.Value)
agent.Post(Merge(check, doAct, fun x -> holder.signal(x) |> ignore))
holder.future
member this.IsThreadSafe with get() : bool = true
member this.GetProcessId mutator =
if mutator then agent.PostAndAsyncReply(fun replyChannel -> GetProcessId(replyChannel))
else Async.FromContinuations(fun (cont,_,_) -> cont(-1))
member this.ReleaseProcessId procId =
if procId <> -1 then agent.Post(ReleaseProcess(procId))
member this.RunningProcsCountExcludeMe procId = agent.PostAndAsyncReply(fun reply -> RunningProcsCountExcludeMe(procId,reply))
module ALternatives =
open State
open System
open Promise
type StateChangeOp<'s,'r> = 's -> 's * 'r
type TransactionResult<'r> = | Ok of 'r
| BlockedForever
| Error of Exception
type Transaction<'s,'r when 's : not struct> =
{state : StateKeeper<'s>;
commit : TransactionResult<'r> -> Async<bool>}
type Alt<'s,'r when 's : not struct> =
Alt of (ProcessId * Transaction<'s,'r> -> Async<unit>) * IsMutatesState
let isMutatesState = function | Alt(_,ismut) -> ismut
let isMutatesState2 = function | Alt(_,ismut), Alt(_,ismut2) -> ismut || ismut2
let asyncReturn x = Async.FromContinuations(fun (cont,_,_) -> cont(x))
let asyncMap wrkfl f = async{
let! r = wrkfl
return f(r)
}
let run procId tran = function | Alt(alt,_) -> alt(procId,tran) |> Async.Start
let fromAsync wrkfl =
Alt((fun (_,tran:Transaction<'s,'r>) ->
async{
try
let! res = wrkfl
let! _ = tran.commit (Ok(res))
return ()
with error -> let! _ = tran.commit (Error(error))
return ()
}),false
)
let asyncAliasing = Async.StartChild
type RestartSignal = | Done
| Restart
| ThrowError of exn
| ResolveStateProblem
let rec choose<'s,'r when 's : not struct> (one:Alt<'s,'r>, two:Alt<'s,'r>) =
Alt((fun (procId,tran:Transaction<'s,'r>) ->
async{
let commitOnce = Promise.create<unit>()
let failSnd = Promise.create<unit>()
let readyToRestart1 = Promise.create<RestartSignal>()
let readyToRestart2 = Promise.create<RestartSignal>()
let parentInitState = tran.state.Value
let rec runSub alt (restarter:Promise.Promise<RestartSignal>) =
let state = SingleStateKeeper(parentInitState, "choose") :> StateKeeper<'s>
let subCommit (x:'r TransactionResult) = async{
state.Stop()
match x with
| Ok(x) -> if commitOnce.signal() then
let! stateReply = if state.IsNotChanged then asyncReturn (Result())
else tran.state.Merge state
match stateReply with
| Result() -> Logger.log "choose" "merge is ok stopping"
restarter.signal(Done) |> ignore
return! tran.commit (Ok(x))
| Die -> Logger.log "choose" "winner merge problem initiating restart"
restarter.signal(Restart) |> ignore
return false
else restarter.signal(Restart)|> ignore
return false
| Error(exn) -> Logger.logf "Error" "choose sub commits error %A" exn
restarter.signal(ThrowError(exn))|> ignore
return false
| BlockedForever -> restarter.signal(ResolveStateProblem)|> ignore
return false
}
run 0 {state = state; commit = subCommit} alt
runSub one readyToRestart1
runSub two readyToRestart2
let restarter = async{
let! res1 = readyToRestart1.future
let! res2 = readyToRestart2.future
//Logger.logf "choose" "resolution with %A" (res1,res2)
let! runProcCount = tran.state.RunningProcsCountExcludeMe(procId)
let canResolveBlock = runProcCount > 0 || (obj.ReferenceEquals(parentInitState, tran.state.Value)|> not)
Logger.logf "choose" "parent canResolveBlock = %A" canResolveBlock
let restart() = Logger.logf "choose" "restarting with procId = %A" procId
run procId tran (choose (one,two))
match res1, res2 with
| Done,_ -> return ()
| _,Done -> return ()
| Restart,_ -> restart()
return ()
| _,Restart -> restart()
return ()
| ThrowError(exn1),ThrowError(exn2) -> let! _ = tran.commit(Error(new AggregateException(exn1,exn2)))
return ()
| ThrowError(exn),_ -> let! _ = tran.commit(Error(exn))
return ()
| _,ThrowError(exn) -> let! _ = tran.commit(Error(exn))
return ()
| _,_ -> if canResolveBlock then restart()
return ()
else Logger.logf "choose" "blocked stopping = %A" procId
let! _ = tran.commit(BlockedForever)
return ()
}
restarter |> Async.Start
}),isMutatesState2 (one,two)
)
let bind (one:Alt<'s,'a>, f:'a -> Alt<'s,'b>) : Alt<'s,'b> =
Alt((fun (procId, tran:Transaction<'s,'b>) ->
async{
let commit res =
match res with
| Ok(v) ->
let subCommited = Promise.create<bool>()
let sub = f v
let commit v = async{
let! succ = tran.commit(v)
subCommited.signal(succ) |> ignore
return succ
}
run procId {state = tran.state; commit = commit} sub
subCommited.future
| BlockedForever -> tran.commit BlockedForever
| Error(exn) -> tran.commit (Error(exn))
run procId {state = tran.state;commit = commit} one
}),true)//tod static checking
let always v = v |> asyncReturn |> fromAsync
let unit () = always ()
let zero () = never ()
let map (alt,f) = bind(alt, fun x -> always(f(x)))
let never() = Alt(fun (procId,tran) -> async{
let! _ = tran.commit(TransactionResult.BlockedForever)
return ()
}, false)
let rec whileLoop guard body =
if guard() then bind(body, fun x -> whileLoop guard body)
else always ()
let tryWith body compensation =
Alt((fun (procId,tran) ->async{
let commit v =
let subCommited = Promise.create<bool>()
let subCommit v = async{
let! succ = tran.commit(v)
subCommited.signal(succ) |> ignore
return succ}
match v with
| BlockedForever -> run procId {tran with commit = subCommit} (compensation(new Exception("BlockedForever")))
subCommited.future
| Error(exn) -> run procId {tran with commit = subCommit} (compensation(exn))
subCommited.future
| _ -> tran.commit v
run procId {tran with commit = commit} body
}),isMutatesState body)
let tryFinally body compensation =
Alt((fun (procId,tran) ->async{
let commit v =
compensation()
tran.commit(v)
run procId {tran with commit = commit} body
}),isMutatesState body)
let mergeChoose (one:Alt<'s,'r>, two:Alt<'s,'r2>) =
choose(bind(one,fun r -> map(two, fun r2 -> r,r2)),
bind(two,fun r2 -> map(one, fun r -> r,r2)))
let merge (one:Alt<'s,'a>, two:Alt<'s,'b>) : Alt<'s, 'a * 'b> =
Alt((fun (procId,tran) ->
async{
let state = if tran.state.IsThreadSafe
then tran.state
else (new MailboxStateKeeper<_>(tran.state.Value, "merge")) :> StateKeeper<_>
let! subProcId1 = if tran.state.IsThreadSafe then asyncReturn procId
else state.GetProcessId(isMutatesState one)
let! subProcId2 = state.GetProcessId(isMutatesState two)
let commit1 = Promise.create()
let commit2 = Promise.create()
let bothOk = Promise.create<bool>()
let commit procId subCommit v =
async{
subCommit.signal v |> ignore
state.ReleaseProcessId(procId)
return! bothOk.future
}
let tran1 = {state = state; commit = commit subProcId1 commit1}
let tran2 = {state = state; commit = commit subProcId2 commit2}
run subProcId1 tran1 one
run subProcId2 tran2 two
let! res1 = commit1.future
let! res2 = commit2.future
if tran.state.IsThreadSafe |> not then state.Stop()
let commit v =
async{
let stateChanged = not state.IsNotChanged
if tran.state.IsThreadSafe |> not && stateChanged
then Logger.log "merge" "merging state with not thread safe parent"
let! resp = tran.state.Merge(state)
Logger.logf "merge" "merging state with not thread safe parent response %A" resp
return ()
return! tran.commit v
}
let! isCommited = match res1, res2 with
| Ok(r), Ok(r2) -> commit <| Ok(r, r2)
| Error(exn), Error(exn2) -> commit (Error(AggregateException([exn;exn2])))
| Error(exn), _ -> commit (Error(exn))
| _, Error(exn) -> commit (Error(exn))
| _ -> commit (BlockedForever)
bothOk.signal(isCommited) |> ignore
}),isMutatesState2 (one,two)
)
let withAck (builder:Alt<'s, bool> -> Async<Alt<'s,'r>>) =
Alt((fun (procId,tran) ->
async{
let nack = Promise.create<bool>()
let commit res = async{
let! commited = tran.commit res
nack.signal(commited) |> ignore
return commited}
let tran = {commit = commit; state = tran.state}
let! alt = builder(fromAsync(nack.future))
run procId tran alt
}), true)
let wrap (alt,f) =
Alt((fun (procId,tran) ->
async{
let commit v = async{
//Logger.logf "wrap intercepting commit %A" v
let! commited = tran.commit v
if commited then f(v)
return commited
}
run procId {commit = commit; state = tran.state} alt
}), isMutatesState alt)
let guard g = withAck <| fun _ -> g
let delay f = guard( async{ return! f()})
let ife (pred,thenAlt, elseAlt) =
bind(pred, fun x ->
if x then thenAlt
else elseAlt)
let none() = always None
let some alt = bind(alt, fun x -> always <| Some(x))
let where (alt,f) = bind(alt, fun x ->
if f(x) then always x
else never ())
let after ms v = async{
do! Async.Sleep(ms)
return v} |> fromAsync
let chooseXs xs = Seq.fold (fun x y -> choose (x,y)) (never()) xs
let mergeXs (xs:Alt<'s,'r> seq) : Alt<'s,'r seq> =
Seq.fold (fun (x:Alt<'s,'r seq>) (y:Alt<'s,'r>) ->
map(merge (x,y), fun (x,y) -> seq{yield y
yield! x})) (always(Seq.empty)) xs
let mergeChooseXs (xs:Alt<'s,'r> seq) : Alt<'s,'r seq> =
Seq.fold (fun (x:Alt<'s,'r seq>) (y:Alt<'s,'r>) ->
map(mergeChoose (x,y), fun (x,y) -> seq{yield y
yield! x})) (always(Seq.empty)) xs
let pickWithResultState state alt =
let res = Promise.create()
let stateR = SingleStateKeeper(state, "pick") :> StateKeeper<_>
let tran = {state = stateR; commit = fun v ->
stateR.Stop()
res.signal(v,stateR.Value) |> ignore
match v with
| Ok(v) -> true |> asyncReturn
| _ -> false |> asyncReturn}
run 0 tran alt
res.future
let pick state alt =
async{
let! r,_ = pickWithResultState state alt
return r
}
let mapSt lens alt =
Alt((fun (procId,tran) ->async{
let state = new MapStateKeeper<_,_>(tran.state,lens)
run procId {commit = tran.commit;state = state} alt
return ()
}), isMutatesState alt)
let stateOp op =
Alt((fun (procId,tran) ->async{
let safeOp :StateOp<_,_> =
fun s -> try
match op s with
| NotBlocked(s,r) -> NotBlocked(s,Ok(r))
| Blocked -> Blocked
with exn -> NotBlocked(s,Error(exn))
let! res = tran.state.Apply(procId,safeOp)
//Logger.logf "State client: recieved response %A" res
match res with
| Result(r) -> //Logger.logf "State client: recieved response %A" r
let! _ = tran.commit(r)
return ()
| Die -> //Logger.log "State client: recieved response Die"
let! _ = tran.commit(BlockedForever)
return ()
}),true)
let private rand = new System.Random(DateTime.Now.Millisecond)
let shuffle s = Seq.sortBy(fun _ -> rand.Next()) s
open ALternatives
open System
open System.Collections.Generic
open System.Threading
type TransactionBuilder() =
member this.Bind(m, f) = bind(m,f)
member this.Return(x) = always x
member this.ReturnFrom(x) = x
//(unit -> bool) * M<'a> -> M<'a>
member this.While(guard, body) = whileLoop guard body
member this.Zero() = always()
//M<'T> * (exn -> M<'T>) -> M<'T>
member this.TryWith(body, handler) = tryWith body handler
// M<'T> * (unit -> unit) -> M<'T>
member this.TryFinally(body, compensation) = tryFinally body compensation
//'T * ('T -> M<'U>) -> M<'U> when 'U :> IDisposable
member this.Using(disposable:#System.IDisposable, body) =
let body' = body disposable
this.TryFinally(body', fun () ->
match disposable with
| null -> ()
| disp -> //Logger.log "Using" "disposing"
disp.Dispose())
//(unit -> M<'T>) -> M<'T>
member this.Delay(f) = bind(unit(), f)
member this.Run(f) = f
//seq<'a> * ('a -> M<'b>) -> M<'b>
member this.For(sequence:seq<_>, body) =
//Logger.log "For" "executing"
this.Using(sequence.GetEnumerator(),fun enum ->
//Logger.log "ForUsing" "executing"
this.While(enum.MoveNext,
this.Delay(fun () -> body enum.Current)))
let tranB = TransactionBuilder()
type ChooseBuilder() =
[<CustomOperation("case")>]
member this.Case(x,y) = ALternatives.choose(y,x)
member this.Yield(()) = never()
let chooseB = ChooseBuilder()
type MergeBuilder() =
[<CustomOperation("case")>]
member this.Case(x,y) = ALternatives.merge(y,x)
member this.Yield(()) = always()
let mergeB = MergeBuilder()
type AltQueryBuilder() =
member this.Bind(m, f) = bind(m,f)
member t.Zip(xs,ys) = ALternatives.merge(xs,ys)
member t.For(xs,f) = bind(xs,f)
member t.For((x,y),f) = bind(ALternatives.merge(x,y),f)
member t.For((x,y,z),f) = let tmp1 = ALternatives.merge(x,y)
let tmp2 = map(ALternatives.merge(tmp1,z), fun ((x,y),z) -> x,y,z)
bind(tmp2,f)
member t.For(x,f) = bind(ALternatives.mergeXs(x),f)
member t.Yield(x) = always(x)
member t.Zero() = always()
[<CustomOperation("where", MaintainsVariableSpace=true)>]
member x.Where
( source:Alt<'s,'r>,
[<ProjectionParameter>] f:'r -> bool ) : Alt<'s,'r> = where(source,f)
[<CustomOperation("select")>]
member x.Select
( source:Alt<'s,'r>,
[<ProjectionParameter>] f:'r -> 'r2) : Alt<'s,'r2> = map(source,f)
let queryB = new AltQueryBuilder()
module Ch =
open State
[<StructuredFormatDisplay("queue({AsString})")>]
type Channel<'a> = {
name :string;
maxCount : int option;
xs:'a list;
rxs:'a list}
with member x.AsString = sprintf "%s %A" x.name (List.append x.rxs (List.rev x.xs))
member x.Count = x.xs.Length + x.rxs.Length
member q.IsEmpty = q.Count = 0
member q.Put x =
if q.maxCount.IsSome && q.Count = q.maxCount.Value then Blocked
else NotBlocked( {name = q.name; maxCount = q.maxCount; xs = q.xs; rxs = x::q.rxs} )
member q.Get () =
if q.IsEmpty then Blocked
else match q.xs with
| [] -> let sub = {name = q.name; maxCount = q.maxCount; xs = (List.rev q.rxs) ;rxs = []}
sub.Get ()
| y::ys -> NotBlocked( {name = q.name; maxCount = q.maxCount; xs = ys ;rxs = q.rxs}, y)
let create limit name = {name = name; maxCount = limit; xs = [];rxs = []}
let EmptyUnbounded name = create None name
let EmptyBounded limit name= create (Some(limit)) name
open Ch
open System.Runtime.CompilerServices
open State
[<Extension>]
type ChEx () =
[<Extension>]
static member inline AltAdd(qlens: Lens<'s, Channel<'v>>, x) =
let changeF state =
let ch : Channel<_> = qlens.get state
//Logger.logf "AltAdd" "putting to channel %A" (ch,x)
let ch = ch.Put x
match ch with
| NotBlocked(ch) ->
Logger.logf "AltAdd" "putting to channel is ok %A" ch
let nState = qlens.set(state,ch)
NotBlocked(nState, ())
| Blocked -> Logger.logf "AltAdd" "putting to channel is blocked %A" (ch,x)
Blocked
stateOp changeF
[<Extension>]
static member inline altGet(qlens: Lens<'s, Channel<'v>>) =
let changeF state=
let ch : Channel<_> = qlens.get state
//Logger.logf "altGet" "changeF getting from channel"
let res = ch.Get()
match res with
| NotBlocked(ch,res) ->
Logger.logf "AltGet" "getting from channel is ok %A" (ch,res)
let nState = qlens.set(state,ch)
NotBlocked(nState, res)
| Blocked -> Logger.logf "AltGet" "getting from channel is blocked %A" ch
Blocked
stateOp changeF
let runSync state alt = alt |> pick state |> Async.RunSynchronously
let test x = x |> Async.RunSynchronously |> Logger.logf "test result" "result is %A"
let testAsync x = async{let! x = x
Logger.logf "testAsync" "resut is %A" x} |> Async.Start
queryB{
for x,y in (always(1),always(1))do
where (x = 2)
select (x + y)
} |> pick () |> test
let ignoreAsyncRes w =
async{
let! _ = w
return ()
}
let add_nack (alt:Alt<'s,'r>) =
withAck (fun (nack : Alt<'s, bool>) ->
let nack = map(nack, fun x -> if not x then Logger.log "nacker" "nacked"
Unchecked.defaultof<'r>)
Logger.log "nacker" "starting nack interception"
asyncReturn <| choose(alt,nack)
)
always(2) |> pick () |> test
always(2) |> add_nack |> pick () |> test
(always(1), always(2)) |> choose |> pick () |> test
[always(1); always(2)] |> chooseXs |> pick () |> test
[always(1); always(2)] |> shuffle |> chooseXs |> pick () |> test
[after 300 "300 wins";after 200 "200 wins"] |> chooseXs |> pick () |> test
let error ms = async{
do! Async.Sleep(ms)
failwith "problem"
} |> fromAsync
error 100 |> pick () |> test
choose(always(),error 100)|> pick () |> test
choose(after 300 (),error 100)|> pick () |> test
choose(error 300 ,error 100)|> pick () |> test
choose(never(),error 100)|> pick () |> test
[after 300 300 |> add_nack;
after 200 200 |> add_nack] |> chooseXs |> pick () |> test
((after 200 200 |> add_nack,
after 300 300 |> add_nack) |> choose,
(after 400 200 |> add_nack,
after 500 300 |> add_nack) |> choose) |> choose |> pick () |> test
choose(fromAsync(async{do! Async.Sleep(1000)
do! Async.Sleep(1000)
return "async wins"}),
after 3000 "after wins") |> pick () |> test
let toAlways a alt = bind(alt,fun _ -> always(a))
let wrapPrint alt = wrap(alt,fun x -> Logger.logf "wrapPrint" "commited succ %A" x)
let St : Ch.Channel<int> = Ch.EmptyBounded(1) "channel"
let badLens = {get = fun _ ->failwith "lens bug";
set = fun (r,v) -> v}
let id_lens = Lens.idTyped<Ch.Channel<int>>()
ChEx.AltAdd(badLens, 1) |> pickWithResultState St |> test
ChEx.AltAdd(id_lens, 1) |> pickWithResultState St |> test
bind(ChEx.AltAdd(id_lens, 1), fun _ -> ChEx.altGet id_lens) |> pickWithResultState St |> test
bind(ChEx.AltAdd(id_lens, 1), fun _ -> ChEx.AltAdd(id_lens, 1)) |> pickWithResultState St |> test
ChEx.AltAdd (id_lens, 0) |> wrapPrint |> pickWithResultState St |> test
ChEx.altGet id_lens |> wrapPrint |> pickWithResultState St |> test
bind(ChEx.altGet id_lens, fun _ -> ChEx.AltAdd(id_lens, 1)) |> pickWithResultState St |> test
merge(ChEx.altGet id_lens, ChEx.AltAdd (id_lens, 1))|> pickWithResultState St |> test
merge(always(1), always(2))|> pick () |> test
[ChEx.AltAdd(id_lens,1) |> toAlways -1; ChEx.altGet id_lens] |> chooseXs |> pickWithResultState St |> test
//joinads samples
type St2 =
{ putStringC: Ch.Channel<string>;
putIntC: Ch.Channel<int>;
echoC: Ch.Channel<string>}
static member putString =
{ get = fun r -> r.putStringC;
set = fun (r,v) -> { r with putStringC = v }}
static member putInt =
{ get = fun r -> r.putIntC;
set = fun (r,v) -> { r with putIntC = v }}
static member echo =
{ get = fun r -> r.echoC;
set = fun (r,v) -> { r with echoC = v }}
let state = {putStringC = Ch.EmptyUnbounded "putStringC"
putIntC = Ch.EmptyUnbounded "putIntC"
echoC = Ch.EmptyUnbounded "echoC"}
let rec whileOk alt = tranB{
do! alt
return! whileOk alt
}
let getPutString = tranB{
let! v = ChEx.altGet St2.putString
do! ChEx.AltAdd (St2.echo, sprintf "Echo %s" v)
}
let getPutInt = tranB{
let! v = ChEx.altGet St2.putInt
do! ChEx.AltAdd (St2.echo, sprintf "Echo %d" v)
}
let getPut = choose(getPutString, getPutInt)
let getEcho = tranB{
let! s = ChEx.altGet St2.echo
Logger.logf "getEcho" "GOT: %A" s
}
// Put 5 values to 'putString' and 5 values to 'putInt'
let put5 =tranB {
for i in [1 .. 5] do
Logger.logf "put5" "iter %d" i
do! ChEx.AltAdd (St2.putString ,sprintf "Hello %d!" i)
do! ChEx.AltAdd (St2.putInt,i)}
put5 |> pickWithResultState state |> test
isMutatesState (getPut)
merge(whileOk getPut, put5)|> pickWithResultState state |> test
merge(whileOk getEcho ,merge(put5,whileOk getPut))|> pickWithResultState state |> test
mergeXs[whileOk getEcho; put5;whileOk getPut]|> pickWithResultState state |> test
mergeB{
case put5
case (whileOk getPut)
case (whileOk getEcho)
} |> pickWithResultState state |> test
//async cancellation
let asyncWitchCancellation wrkfl =
withAck(fun nack -> async{
let cts = new CancellationTokenSource()
let wrkfl, res = Promise.wrapWrkfl(wrkfl)
Async.Start(wrkfl, cts.Token)
let nack = map(nack, fun commited ->
if not commited then printfn "async cancelled"
cts.Cancel())
async{
let! _ = pick () nack
return ()
} |> Async.Start
return fromAsync res
})
let wrkfl = async{
do! Async.Sleep(1000)
return "async finished"
}
(asyncWitchCancellation wrkfl, always "always finished") |> choose |> pick ()|> test
(asyncWitchCancellation wrkfl, never()) |> choose |> pick () |> test
//fetcher
open Microsoft.FSharp.Control.WebExtensions
open System.Net
let fetchAsync (name, url:string) = async {
let uri = new System.Uri(url)
let webClient = new WebClient()
let! html = webClient.AsyncDownloadString(uri)
return sprintf "Read %d characters for %s" html.Length name
}
let fetchAlt (name, url) : Alt<'s,string> =
fetchAsync (name, url) |> asyncWitchCancellation
let urlList = [ "Microsoft.com", "http://www.microsoft.com/"
"MSDN", "http://msdn.microsoft.com/"
"Bing", "http://www.bing.com" ]
let runFastest () =
urlList
|> Seq.map fetchAlt
|> chooseXs
|> pick ()
|> test
let runAll () =
urlList
|> Seq.map fetchAlt
|> mergeXs
|> pick ()
|> test
runFastest()
runAll()
//one place buffer
type St3 =
{ putC: Ch.Channel<string>;
getC: Ch.Channel<string>;
emptyC: Ch.Channel<unit>;
containsC: Ch.Channel<string>}
static member put =
{ get = fun r -> r.putC;
set = fun (r,v) -> { r with putC = v }}
static member get =
{ get = fun r -> r.getC;
set = fun (r,v) -> { r with getC = v }}
static member empty =
{ get = fun r -> r.emptyC;
set = fun (r,v) -> { r with emptyC = v }}
static member contains =
{ get = fun r -> r.containsC;
set = fun (r,v) -> { r with containsC = v }}
let stateSt3 = { putC = Ch.EmptyUnbounded "putC"
getC = Ch.EmptyUnbounded "getC"
emptyC = Ch.EmptyUnbounded "emptyC"
containsC = Ch.EmptyUnbounded "containsC"}
let add_empty = ChEx.AltAdd (St3.empty, ())
let alts = chooseB{
case (tranB{
do! ChEx.altGet St3.empty
let! x = ChEx.altGet St3.put
do! ChEx.AltAdd (St3.contains,x)
})
case (tranB{
let! v = ChEx.altGet St3.contains
do! ChEx.AltAdd (St3.get,v)
do! ChEx.AltAdd (St3.empty,())
})}
let put = tranB {
do! fromAsync <| Async.Sleep 1000
for i in 0 .. 10 do
Logger.logf "put" "putting: %d" i
do! ChEx.AltAdd (St3.put,string i)
do! fromAsync <| Async.Sleep 500 }
let got = tranB {
do! fromAsync <| Async.Sleep 250
let! v = ChEx.altGet St3.get
Logger.logf "got" "got: %s" v
}
mergeXs [whileOk got; put; whileOk alts; add_empty] |> pick stateSt3 |> test
// Dinning philosophers
let n = 5
let mapReplace k v map =
let r = Map.remove k map
Map.add k v r
type St4 =
{ chopsticksCs: Map<int,Ch.Channel<unit>>;
hungryC: Map<int,Ch.Channel<unit>>;}
static member chopsticks i =
{ get = fun r -> Logger.logf "philosophers" "getting chopsticksCs %d " i
r.chopsticksCs.[i];
set = fun (r,v) -> {r with chopsticksCs = mapReplace i v r.chopsticksCs}}
static member hungry i =
{ get = fun r -> Logger.logf "philosophers" "getting hungry %d " i
r.hungryC.[i];
set = fun (r,v) -> {r with hungryC = mapReplace i v r.hungryC}}
let phioSt = {chopsticksCs = [ for i = 1 to n do yield i, Ch.EmptyUnbounded("chopsticksCs")] |> Map.ofList
hungryC = [ for i = 1 to n do yield i, Ch.EmptyBounded 1 "hungryC" ] |> Map.ofList}
let philosophers = [| "Plato"; "Konfuzius"; "Socrates"; "Voltaire"; "Descartes" |]
let randomDelay (r : Random) = Async.Sleep(r.Next(1, 3) * 1000) |> fromAsync
let queries = Array.ofSeq (seq{
for i = 1 to n do
Logger.logf "philosophers" "left %d " i
let left = St4.chopsticks i
Logger.logf "philosophers" "left %d "(i % n + 1)
let right = St4.chopsticks (i % n + 1)
let random = new Random()
//yield merge(always(i,random,left,right),merge(ChEx.altGet (St4.hungry i), merge(ChEx.altGet left, ChEx.altGet right)))
yield queryB{
for _,_,_ in (ChEx.altGet (St4.hungry i), ChEx.altGet left, ChEx.altGet right) do
select(i,random,left,right)
}
})
let findAndDo = tranB{
let! i,random,left,right = chooseXs(queries)
Logger.logf "philosophers" "%d wins " i
Logger.logf "philosophers" "%s is eating" philosophers.[i-1]
do! randomDelay random
do! ChEx.AltAdd (left,())
do! ChEx.AltAdd (right, ())
Logger.logf "philosophers" "%s is thinking" philosophers.[i-1]
return ()
}
let add_chopsticks = tranB{
for i in 1..n do
//Logger.logf "philosophers" "adding chopstick %d" i
do! ChEx.AltAdd(St4.chopsticks i, ())
}
let random = new Random()
let hungrySet = tranB{
let i = random.Next(1, n)
Logger.logf "philosophers" "set hungry %s" philosophers.[i]
do! ChEx.AltAdd (St4.hungry(i),())
do! randomDelay random
}
mergeXs [whileOk findAndDo;whileOk hungrySet;add_chopsticks] |> pickWithResultState phioSt |> test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment