Created
February 6, 2014 15:16
-
-
Save t0yv0/8846137 to your computer and use it in GitHub Desktop.
Par monad extended with deterministic available-as-soon-as-possible logging.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if INTERACTIVE | |
#else | |
namespace Examples | |
#endif | |
(* | |
[<Sealed>] | |
type Future<'T> | |
module Future = | |
val Create : unit -> Future<'T> | |
val CreateCompleted : 'T -> Future<'T> | |
val CreateFailed : exn -> Future<'T> | |
val Await : Future<'T> -> Async<'T> | |
val Fail : Future<'T> -> exn -> unit | |
val Set : Future<'T> -> 'T -> unit | |
*) | |
open System | |
open System.Threading | |
type FutureState<'T> = | |
| Completed of 'T | |
| Failed of exn | |
| Waiting of list<('T -> unit) * (exn -> unit)> | |
type Future<'T> = | |
{ | |
mutable FutureState : FutureState<'T> | |
} | |
module Future = | |
let Create () = | |
{ FutureState = Waiting [] } | |
let CreateCompleted x = | |
{ FutureState = Completed x } | |
let CreateFailed e = | |
{ FutureState = Failed e } | |
let Update fut f = | |
let rec loop i = | |
let st0 = fut.FutureState | |
let st1 = f st0 | |
if Object.ReferenceEquals(st0, Interlocked.CompareExchange(&fut.FutureState, st1, st0)) then | |
st0 | |
else | |
Thread.SpinWait(1 <<< min i 24) | |
loop (i + 1) | |
loop 0 | |
let Await fut = | |
Async.FromContinuations <| fun (ok, no, _) -> | |
let state = | |
Update fut <| fun st -> | |
match st with | |
| Completed r -> st | |
| Failed e -> st | |
| Waiting xs -> Waiting ((ok, no) :: xs) | |
match state with | |
| Completed r -> ok r | |
| Failed e -> no e | |
| Waiting _ -> () | |
let Fail fut e = | |
let state = | |
let s = Failed e | |
Update fut <| fun st -> | |
match st with | |
| Waiting _ -> s | |
| _ -> st | |
match state with | |
| Waiting xs -> for (_, k) in xs do k e | |
| _ -> failwith "Future.Fail broke contract: called more than once" | |
let Set fut x = | |
let state = | |
let s = Completed x | |
Update fut <| fun st -> | |
match st with | |
| Waiting _ -> s | |
| _ -> st | |
match state with | |
| Waiting xs -> for (k, _) in xs do k x | |
| _ -> failwith "Future.Set broke contract: called more than once" | |
(* | |
[<Sealed>] | |
type Stream<'T> | |
module Stream = | |
val Empty<'T> : Stream<'T> | |
val Append<'T> : Stream<'T> -> Stream<'T> -> Stream<'T> | |
val FromFuture : Future<Stream<'T>> -> Stream<'T> | |
val Singleton : 'T -> Stream<'T> | |
val Iterate : ('T -> unit) -> Stream<'T> -> Async<unit> | |
*) | |
type Stream<'T> = | |
| S0 | |
| S1 of 'T | |
| SApp of Stream<'T> * Stream<'T> | |
| SFut of Future<Stream<'T>> | |
module Stream = | |
let Empty<'T> : Stream<'T> = | |
S0 | |
let Singleton x = | |
S1 x | |
let FromFuture fut = | |
SFut fut | |
let Append a b = | |
SApp (a, b) | |
let rec Iterate (f: 'T -> unit) (s: Stream<'T>) : Async<unit> = | |
async { | |
match s with | |
| S0 -> return () | |
| S1 x -> return f x | |
| SApp (a, b) -> | |
do! Iterate f a | |
return! Iterate f b | |
| SFut fut -> | |
let! str = Future.Await fut | |
return! Iterate f str | |
} | |
(* | |
type Message = | |
| Message of string | |
[<Sealed>] | |
type Par<'T> | |
module Par = | |
val Return : 'T -> Par<'T> | |
val Bind : Par<'T1> -> ('T1 -> Par<'T2>) -> Par<'T2> | |
val Spawn : Par<'T> -> Par<Future<'T>> | |
val Await : Future<'T> -> Par<'T> | |
val DoAsync : Async<'T> -> Par<'T> | |
val Log : Message -> Par<unit> | |
val Start : (Message -> unit) -> Par<'T> -> Async<'T> | |
*) | |
type Message = | |
| Message of string | |
type Par<'T> = | |
| Par of (unit -> Stream<Message> * Future<'T>) | |
module Par = | |
let Def par = | |
Par par | |
let Return (x: 'T) : Par<'T> = | |
Def <| fun () -> (Stream.Empty, Future.CreateCompleted x) | |
let Bind (Par x) (f: 'T1 -> Par<'T2>) : Par<'T2> = | |
Def <| fun () -> | |
let (streamHead, x) = x () | |
let result = Future.Create() | |
let streamVar = Future.Create() | |
async { | |
try | |
let! x = Future.Await x | |
let (Par yF) = f x | |
let (yS, yV) = yF () | |
do Future.Set streamVar yS | |
let! y = Future.Await yV | |
return Future.Set result y | |
with e -> | |
return Future.Fail result e | |
} | |
|> Async.Start | |
let stream = Stream.Append streamHead (Stream.FromFuture streamVar) | |
(stream, result) | |
let Spawn (Par comp : Par<'T>) : Par<Future<'T>> = | |
Def <| fun () -> | |
let (s, v) = comp () | |
(s, Future.CreateCompleted v) | |
let DoAsync (a: Async<'T>) : Par<'T> = | |
Def <| fun () -> | |
let v = Future.Create () | |
async { | |
try | |
let! r = a | |
return Future.Set v r | |
with e -> | |
return Future.Fail v e | |
} | |
|> Async.Start | |
(Stream.Empty, v) | |
let Await (f: Future<'T>) : Par<'T> = | |
DoAsync <| async { return! Future.Await f } | |
let Log (msg: Message) : Par<unit> = | |
Def <| fun () -> | |
(Stream.Singleton msg, Future.CreateCompleted ()) | |
let Start (log: Message -> unit) (Par p : Par<'T>) : Async<'T> = | |
async { | |
let (s, v) = p () | |
do! Stream.Iterate log s | |
return! Future.Await v | |
} | |
(* | |
Example: parallelized fibonacci. | |
*) | |
module Main = | |
[<Sealed>] | |
type ParBuilder() = | |
member x.Bind(v, f) = Par.Bind v f | |
member x.Return(v) = Par.Return v | |
let par = ParBuilder() | |
let rec ParFib (n: int) : Par<int> = | |
par { | |
do! Par.Log (Message (sprintf "%i" n)) | |
do! Par.DoAsync (Async.Sleep 250) | |
match n with | |
| 0 | 1 -> | |
return 1 | |
| n -> | |
let! a = Par.Spawn (ParFib (n - 1)) | |
let! b = ParFib (n - 2) | |
let! a = Par.Await a | |
return a + b | |
} | |
let Start () = | |
async { | |
let! value = | |
ParFib 10 | |
|> Par.Start (fun (Message s) -> stdout.WriteLine(s); stdout.Flush()) | |
return printfn "Computed: %i" value | |
} | |
|> Async.RunSynchronously |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment