Created
March 31, 2020 10:54
-
-
Save AnthonyLloyd/d7716400b75aa95fc4d7e6b246287675 to your computer and use it in GitHub Desktop.
IO
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Fsion | |
open System.Threading | |
[<Struct;NoEquality;NoComparison>] | |
type Cancel = | |
private | |
| Cancel of bool ref * children: Cancel list ref | |
module internal Cancel = | |
let isSet (_:'r,Cancel(i,_)) = !i | |
let create() = Cancel(ref false, ref []) | |
let add (r:'r,Cancel(_,c)) = | |
let i = create() | |
c := i::!c | |
r,i | |
let rec set (r,Cancel(me,kids)) = | |
me := true | |
List.iter (fun i -> set(r,i)) !kids | |
[<NoEquality;NoComparison>] | |
type UIO<'r,'a> = | |
| UIO of ('r * Cancel -> ('a option -> unit) -> unit) | |
member m.Bind(f:'a->UIO<'r,'b>) : UIO<'r,'b> = | |
let (UIO run) = m | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
match Option.map f o with | |
| None -> cont None | |
| Some(UIO run) -> | |
if Cancel.isSet env then cont None | |
else run env cont | |
) | |
) | |
module UIO = | |
let result a : UIO<'r,'a> = | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else cont (Some a) | |
) | |
let effect (f:'r->'a) : UIO<'r,'a> = | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
let a = fst env |> f | |
if Cancel.isSet env then cont None | |
else Some a |> cont | |
) | |
let map (f:'a->'b) (UIO run) : UIO<'r,'b> = | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
match Option.map f o with | |
| None -> cont None | |
| Some b -> | |
if Cancel.isSet env then cont None | |
else cont (Some b) | |
) | |
) | |
let delay milliseconds : UIO<'r,unit> = | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
let mutable t = Unchecked.defaultof<_> | |
t <- new Timer((fun _ -> | |
t.Dispose() | |
if Cancel.isSet env then cont None | |
else cont (Some()) | |
), null, milliseconds, Timeout.Infinite) | |
) | |
let flatten f : UIO<'r,'a> = | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
let (UIO run) = fst env |> f | |
if Cancel.isSet env then cont None | |
else run env (fun t -> | |
if Cancel.isSet env then cont None | |
else cont t | |
) | |
) | |
let toAsync (env:'r) (UIO run) : Async<'a> = | |
Async.FromContinuations(fun (cont,_,_) -> | |
run (env,Cancel.create()) (fun o -> | |
cont o.Value | |
) | |
) | |
let fork (UIO run) : UIO<'r,UIO<'r,'a>> = | |
UIO (fun env contFork -> | |
if Cancel.isSet env then contFork None | |
else | |
let mutable o = null | |
threadpool (fun _ -> | |
run env (fun a -> | |
let o = Interlocked.CompareExchange(&o, a, null) | |
if isNull o |> not then | |
let cont = o :?> 'a Option->unit | |
if Cancel.isSet env then cont None | |
else cont a | |
) | |
) | |
UIO (fun env cont -> | |
let o = Interlocked.CompareExchange(&o, cont, null) | |
if Cancel.isSet env then cont None | |
elif isNull o |> not then | |
if Cancel.isSet env then cont None | |
else cont (o :?> 'a option) | |
) | |
|> Some | |
|> contFork | |
) | |
let toAsyncCancel (env:'r,cancel) (UIO run) : Async<'a option> = | |
Async.FromContinuations(fun (cont,_,_) -> | |
run (env,cancel) (fun o -> | |
cont o | |
) | |
) | |
let throttle (n:int) (ios:UIO<'r,'a>[]) : UIO<'r,'a>[] = | |
let mutable threads = 0 | |
let mutable count = 0 | |
let mutable index = -1 | |
let runCont = Array.zeroCreate ios.Length | |
Array.map (fun (UIO run) -> | |
UIO (fun env cont -> | |
runCont.[Interlocked.Increment(&count)-1] <- Some(run,cont) | |
let rec processLoop missing = | |
let missing = | |
if Interlocked.Increment(&threads) <= n then | |
let rec loop missing = | |
let i = Interlocked.Increment(&index) | |
if i >= count then | |
Interlocked.Decrement(&index) |> ignore | |
List.choose (fun i -> | |
match runCont.[i] with | |
| Some(run,cont) -> | |
if Cancel.isSet env then cont None | |
else run env cont | |
None | |
| None -> Some i | |
) missing | |
else | |
let missing = | |
match runCont.[i] with | |
| Some(run,cont) -> | |
if Cancel.isSet env then cont None | |
else run env cont | |
missing | |
| None -> i::missing | |
loop missing | |
loop missing | |
else missing | |
let t = Interlocked.Decrement(&threads) | |
if List.isEmpty missing |> not || (t = 0 && index+1 < count) then | |
processLoop missing | |
processLoop [] | |
) | |
) ios | |
let para (ios:UIO<'r,'a>[]) : UIO<'r,'a[]> = | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
elif Array.isEmpty ios then Array.empty |> Some |> cont | |
else | |
let envChild = Cancel.add env | |
let results = Array.zeroCreate ios.Length | |
let mutable count = ios.Length | |
Array.iteri (fun i (UIO run) -> | |
threadpool (fun _ -> | |
run envChild (fun a -> | |
match a with | |
| Some a -> | |
results.[i] <- a | |
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
elif Interlocked.Decrement(&count) = 0 then | |
if Cancel.isSet env then cont None | |
else results |> Some |> cont | |
| None -> | |
if Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
) | |
) | |
) ios | |
) | |
let paraSum (ios:UIO<'r,int>[]) : UIO<'r,int> = | |
UIO (fun env cont -> | |
if Cancel.isSet env then cont None | |
elif Array.isEmpty ios then Some 0 |> cont | |
else | |
let envChild = Cancel.add env | |
let mutable result = 0 | |
let mutable count = ios.Length | |
Array.iteri (fun i (UIO run) -> | |
threadpool (fun _ -> | |
run envChild (fun a -> | |
match a with | |
| Some a -> | |
Interlocked.Add(&result, a) |> ignore | |
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
elif Interlocked.Decrement(&count) = 0 then | |
if Cancel.isSet env then cont None | |
else Some result |> cont | |
| None -> | |
if Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
) | |
) | |
) ios | |
) | |
type ClockService = | |
abstract member Time : unit -> UIO<'r,Time> | |
abstract member Sleep : int -> UIO<'r,unit> | |
type Clock = | |
abstract member Clock : ClockService | |
module Clock = | |
let time() = | |
UIO.flatten (fun (c:#Clock) -> | |
c.Clock.Time() | |
) | |
let sleep milliseconds = | |
UIO.flatten (fun (c:#Clock) -> | |
c.Clock.Sleep milliseconds | |
) | |
let liveService = | |
{ new ClockService with | |
member __.Time() = Time.now() |> UIO.result | |
member __.Sleep milliseconds = UIO.delay milliseconds | |
} | |
[<Struct;NoEquality;NoComparison>] | |
type Decision<'a,'b> = | |
| Decision of cont:bool * delay:int * state:'a * (unit -> 'b) | |
[<Struct;NoEquality;NoComparison>] | |
type Schedule<'r,'s,'a,'b> = | |
private | |
| Schedule of initial:UIO<'r,'s> * update:('a * 's -> UIO<'r,Decision<'s,'b>>) | |
module Schedule = | |
let forever<'r,'a> : Schedule<'r,int,'a,int> = | |
Schedule (UIO.result 0, fun (_,s) -> UIO.result (Decision(true,0,s+1,(fun () -> s+1)))) | |
let private updated (update:('a * 's -> UIO<'r,Decision<'s,'b>>) -> 'a * 's -> UIO<'r,Decision<'s,'b2>>) (Schedule(i,u)) = | |
Schedule (i,update u) | |
let private check (test:'a * 'b -> UIO<'r,bool>) m = | |
updated (fun update (a,s) -> | |
update(a,s).Bind(fun (Decision(cont,dur,a1,fb) as d) -> | |
if cont then test(a,fb()) |> UIO.map (fun b -> Decision(b,dur,a1,fb)) | |
else UIO.result d | |
) | |
) m | |
let whileOutput (f:'b->bool) m = check (fun (_,b) -> UIO.result (f b)) m | |
let recurs n = whileOutput (fun i -> i <= n) forever | |
[<NoEquality;NoComparison>] | |
type IO<'r,'a,'e> = | |
| IO of ('r * Cancel -> (Result<'a,'e> option -> unit) -> unit) | |
member m.Bind(f:'a->UIO<'r,'b>) : IO<'r,'b,'e> = | |
let (IO run) = m | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
match o with | |
| None -> cont None | |
| Some(Ok a) -> | |
let (UIO run) = f a | |
if Cancel.isSet env then cont None | |
else run env (fun o -> | |
if Cancel.isSet env then cont None | |
else Option.map Ok o |> cont | |
) | |
| Some(Error e) -> cont (Some(Error e)) | |
) | |
) | |
member m.Bind(f:'a->IO<'r,'b,'e>) : IO<'r,'b,'e> = | |
let (IO run) = m | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
match o with | |
| None -> cont None | |
| Some(Ok a) -> | |
let (IO run) = f a | |
if Cancel.isSet env then cont None | |
else run env cont | |
| Some(Error e) -> cont (Some(Error e)) | |
) | |
) | |
type UIO<'r,'a> with | |
member m.Bind(f:'a->IO<'r,'b,'e>) : IO<'r,'b,'e> = | |
let (UIO run) = m | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
match Option.map f o with | |
| None -> cont None | |
| Some(IO bind) -> | |
if Cancel.isSet env then cont None | |
else bind env cont | |
) | |
) | |
module IO = | |
let ok a : IO<'r,'a,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else cont (Some (Ok a)) | |
) | |
let error e : IO<'r,'a,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else cont (Some (Error e)) | |
) | |
let result a : IO<'r,'a,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else cont (Some a) | |
) | |
let effect f : IO<'r,'a,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
let a = fst env |> f | |
if Cancel.isSet env then cont None | |
else cont (Some a) | |
) | |
let map (f:'a->'b) (IO run) : IO<'r,'b,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
let b = Option.map (Result.map f) o | |
if Cancel.isSet env then cont None | |
else cont b | |
) | |
) | |
let mapError (f:'e->'e2) (IO run) : IO<'r,'a,'e2> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
let b = Option.map (Result.mapError f) o | |
if Cancel.isSet env then cont None | |
else cont b | |
) | |
) | |
let fromUIO (UIO run:UIO<'r,Result<'a,'e>>) : IO<'r,'a,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else run env (fun o -> cont o) | |
) | |
let mapResult (f:Result<'a,'e>->Result<'b,'e2>) (IO run) : IO<'r,'b,'e2> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
let b = Option.map f o | |
if Cancel.isSet env then cont None | |
else cont b | |
) | |
) | |
let inline private foldM (succ:'a->IO<'r,'b,'e2>) | |
(err:'e->IO<'r,'b,'e2>) | |
(IO run) : IO<'r,'b,'e2> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
match o with | |
| None -> cont None | |
| Some(Ok a) -> | |
let (IO run) = succ a | |
if Cancel.isSet env then cont None | |
else run env cont | |
| Some(Error e) -> | |
let (IO run) = err e | |
if Cancel.isSet env then cont None | |
else run env cont | |
) | |
) | |
let private retryOrElseEither (Schedule(initial, update)) | |
(orElse:'e *'s->IO<'r,'b,'e2>) | |
(io:IO<'r,'a,'e>) : IO<'r,Choice<'a,'b>,'e2> = | |
let rec loop (state:'s) : IO<'r,Choice<'a,'b>,'e2> = | |
foldM | |
(Choice1Of2 >> Ok >> result) | |
(fun e -> | |
let u = update(e,state) | |
u.Bind (fun (Decision(cont,delay,state,_)) -> | |
if cont then | |
if delay = 0 then loop state | |
else Clock.sleep(delay).Bind(fun _ -> loop state) | |
else | |
orElse(e,state) | |
|> map Choice2Of2 | |
) | |
) | |
io | |
initial.Bind loop | |
let retry (policy:Schedule<'r,'s,'e,'sb>) (io:IO<'r,'a,'e>) : IO<'r,'a,'e> = | |
retryOrElseEither policy (fst >> Error >> result) io | |
|> map Choice.merge | |
let fork (IO run) : UIO<'r,IO<'r,'a,'e>> = | |
UIO (fun env contFork -> | |
if Cancel.isSet env then contFork None | |
else | |
let mutable o = None | |
threadpool (fun _ -> | |
run env (fun a -> | |
match Interlocked.CompareExchange(&o, Some(Choice1Of2 a), None) with | |
| Some(Choice2Of2 cont) -> | |
if Cancel.isSet env then cont None | |
else cont a | |
| _ -> () | |
) | |
) | |
IO (fun env cont -> | |
match Interlocked.CompareExchange(&o, Some(Choice2Of2 cont), None) with | |
| Some(Choice1Of2 a) -> | |
if Cancel.isSet env then cont None | |
else cont a | |
| _ -> () | |
) | |
|> Some |> contFork | |
) | |
let para (ios:IO<'r,'a,'e>[]) : IO<'r,'a[],'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
elif Array.isEmpty ios then Ok Array.empty |> Some |> cont | |
else | |
let envChild = Cancel.add env | |
let results = Array.zeroCreate ios.Length | |
let mutable count = ios.Length | |
Array.iteri (fun i (IO run) -> | |
threadpool (fun _ -> | |
run envChild (fun a -> | |
match a with | |
| Some(Ok a) -> | |
results.[i] <- a | |
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
elif Interlocked.Decrement(&count) = 0 then | |
if Cancel.isSet env then cont None | |
else Ok results |> Some |> cont | |
| Some(Error e) -> | |
if Interlocked.Exchange(&count,-1) > 0 then | |
Cancel.set envChild | |
if Cancel.isSet env then cont None | |
else Error e |> Some |> cont | |
| None -> | |
if Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
) | |
) | |
) ios | |
) | |
let paraSum (ios:IO<'r,int,'e>[]) : IO<'r,int,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
elif Array.isEmpty ios then Ok 0 |> Some |> cont | |
else | |
let envChild = Cancel.add env | |
let mutable result = 0 | |
let mutable count = ios.Length | |
Array.iter (fun (IO run) -> | |
threadpool (fun _ -> | |
run envChild (fun a -> | |
match a with | |
| Some(Ok a) -> | |
Interlocked.Add(&result, a) |> ignore | |
if Cancel.isSet env && Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
elif Interlocked.Decrement(&count) = 0 then | |
if Cancel.isSet env then cont None | |
else Ok result |> Some |> cont | |
| Some(Error e) -> | |
if Interlocked.Exchange(&count,-1) > 0 then | |
Cancel.set envChild | |
if Cancel.isSet env then cont None | |
else Error e |> Some |> cont | |
| None -> | |
if Interlocked.Exchange(&count,-1) > 0 then | |
cont None | |
) | |
) | |
) ios | |
) | |
let throttle (n:int) (ios:IO<'r,'a,'e>[]) : IO<'r,'a,'e>[] = | |
let mutable threads = 0 | |
let mutable indexMax = -1 | |
let mutable index = -1 | |
let runCont = Array.zeroCreate ios.Length | |
Array.map (fun (IO run) -> | |
IO (fun env cont -> | |
runCont.[Interlocked.Increment(&indexMax)] <- Some(run,cont) | |
if Interlocked.Increment(&threads) <= n then | |
let rec loop missing = | |
let missing = | |
List.choose (fun i -> | |
match runCont.[i] with | |
| Some(run,cont) -> | |
if Cancel.isSet env then cont None | |
else run env cont | |
runCont.[i] <- None | |
None | |
| None -> Some i | |
) missing | |
let i = Interlocked.Increment(&index) | |
let missing = | |
if i <= indexMax then | |
match runCont.[i] with | |
| Some(run,cont) -> | |
if Cancel.isSet env then cont None | |
else run env cont | |
runCont.[i] <- None | |
missing | |
| None -> i::missing | |
else | |
Interlocked.Decrement(&index) |> ignore | |
missing | |
if missing <> [] || index < indexMax then | |
loop missing | |
loop [] | |
Interlocked.Decrement(&threads) |> ignore | |
) | |
) ios | |
let paraFold (folder:'state->'a->'state) (state:'state) (ios:IO<'r,'a,'e>[]) : IO<'r,'state,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
elif Array.isEmpty ios then Ok state |> Some |> cont | |
else | |
let mutable state = state | |
let mutable count = ios.Length | |
let mutable index = 0 | |
let envChild = Cancel.add env | |
let results = Array.zeroCreate ios.Length | |
Array.iteri (fun i (IO run) -> | |
threadpool (fun _ -> | |
run envChild (fun a -> | |
match a with | |
| Some(Ok a) -> | |
let rec foldSomeMore i = | |
if i = results.Length then index <- i | |
elif Cancel.isSet env |> not then | |
match results.[i] with | |
| None -> | |
index <- i | |
| Some v -> | |
results.[i] <- None | |
state <- folder state v | |
foldSomeMore (i+1) | |
if i = index && Cancel.isSet env |> not then | |
state <- folder state a | |
foldSomeMore (i+1) | |
else | |
results.[i] <- Some a | |
if Interlocked.Decrement(&count) = 0 then | |
foldSomeMore index | |
if Cancel.isSet env then cont None | |
else Ok state |> Some |> cont | |
| Some(Error e) -> | |
if Interlocked.Exchange(&count,-1) >= 0 then | |
if Cancel.isSet env then cont None | |
else Error e |> Some |> cont | |
Cancel.set envChild | |
| None -> | |
if Interlocked.Exchange(&count,-1) >= 0 then | |
cont None | |
) | |
) | |
) ios | |
) | |
let fold (folder:'state->'a->'state) (state:'state) (ios:IO<'r,'a,'e>[]) : IO<'r,'state,'e> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
elif Array.isEmpty ios then Ok state |> Some |> cont | |
else | |
let mutable state = state | |
let mutable count = ios.Length | |
let rec loop i = | |
if i = ios.Length then | |
if Cancel.isSet env then cont None | |
else Ok state |> Some |> cont | |
else | |
let (IO run) = ios.[i] | |
run env (fun a -> | |
match a with | |
| Some(Ok a) -> | |
if count >= 0 && Cancel.isSet env |> not then | |
state <- folder state a | |
loop (i+1) | |
| Some(Error e) -> | |
if Interlocked.Exchange(&count,-1) >= 0 then | |
if Cancel.isSet env then cont None | |
else Error e |> Some |> cont | |
| None -> | |
if Interlocked.Exchange(&count,-1) >= 0 then | |
cont None | |
) | |
loop 0 | |
) | |
let race (UIO run1) (IO run2) : IO<'r,Choice<'a1,'a2>,'e1> = | |
IO (fun env cont -> | |
if Cancel.isSet env then cont None | |
else | |
let envChild = Cancel.add env | |
let mutable o = 0 | |
threadpool (fun _ -> | |
run1 envChild (fun a -> | |
if Interlocked.Exchange(&o,1) = 0 then | |
Cancel.set envChild | |
if Cancel.isSet env then cont None | |
else Option.map (Choice2Of2 >> Ok) a |> cont | |
) | |
) | |
threadpool (fun _ -> | |
run2 envChild (fun a -> | |
if Interlocked.Exchange(&o,1) = 0 then | |
Cancel.set envChild | |
if Cancel.isSet env then cont None | |
else Option.map (Result.map Choice1Of2) a |> cont | |
) | |
) | |
) | |
let timeout (milliseconds:int) (io:IO<'r,'a,'e>) : IO<'r,'a,'e option> = | |
IO (fun env cont -> | |
let (IO run) = race (Clock.sleep milliseconds) io | |
run env (fun o -> | |
if Cancel.isSet env then cont None | |
else | |
match o with | |
| None -> None | |
| Some(Ok (Choice1Of2 a)) -> Ok a |> Some | |
| Some(Ok (Choice2Of2 ())) -> Error None |> Some | |
| Some(Error e) -> Error (Some e) |> Some | |
|> cont | |
) | |
) | |
let toAsync (env:'r) (IO run) : Async<Result<'a,'e>> = | |
Async.FromContinuations(fun (cont,_,_) -> | |
run (env,Cancel.create()) (fun o -> | |
cont o.Value | |
) | |
) | |
let toAsyncCancel (env:'r,cancel) (IO run) : Async<Result<'a,'e> option> = | |
Async.FromContinuations(fun (cont,_,_) -> | |
run (env,cancel) (fun o -> | |
cont o | |
) | |
) | |
type IOBuilder() = | |
member inline __.Bind(io:UIO<'r,'a>, f:'a->UIO<'r,'b>) : UIO<'r,'b> = io.Bind f | |
member inline __.Bind(io:IO<'r,'a,'e>, f:'a->UIO<'r,'b>) : IO<'r,'b,'e> = io.Bind f | |
member inline __.Bind(io:UIO<'r,'a>, f:'a->IO<'r,'b,'e>) : IO<'r,'b,'e> = io.Bind f | |
member inline __.Bind<'r,'a,'b,'e>(io:IO<'r,'a,'e>, f:'a->IO<'r,'b,'e>) = io.Bind<'b> f | |
member inline __.Return value = UIO.result value | |
member inline __.ReturnFrom value = value | |
member inline __.Zero() = UIO.result Unchecked.defaultof<_> | |
[<AutoOpen>] | |
module IOAutoOpen = | |
let io = IOBuilder() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment