> Async.pure.run 'let List.parMap (x -> x*10) [1,2,3,4,5] > Async.pure.run 'let use Async put fork await empty! tr = empty! t1 = fork '(await tr + 1) t2 = fork '(put 9 tr) Async.await t1 > List.map (x -> x * 100) [1,2,3,4,5] Async.put : a -> t a -> () Async.put a t = Async.complete (Right a) t use .base.M2h Scope unique type PureMVar s g a = PureMVar (Ref {Scope s} (Optional a)) (Ref {Scope s} [Optional a ->{Scope s, g} ()]) unique ability Concurrent t g where empty! : t a full : a -> t a put : a -> t a -> () tryPut : a -> t a -> Boolean take : t a -> a tryTake : t a -> Optional a read : t a -> a tryRead : t a -> Optional a isEmpty : t a -> Boolean fork : '{g, Concurrent t g} () -> () forkInterruptible : t x -> '{g, Concurrent t g} () -> () sleepMicroseconds : Nat -> () unique type Concurrent.T t a = T Boolean (t (Either Failure a)) Async.embed : '{g, Async t g} a -> '{Async t g} a Async.embed a _ = Async.await (Async.fork a) Async.pure.run : (forall t . '{Async t g, g} a) ->{g} Either Failure a Async.pure.run a = Concurrent.run.pure '(catch <| Async.toConcurrent (Async.embed a)) Concurrent.tryRun : (forall t . '{IO,Exception,Concurrent t {IO,Exception}} a) ->{IO} Either Failure a Concurrent.tryRun a = catch '(run.impl (Concurrent.embed a)) Concurrent.embed : '{g, Concurrent t g} a -> '{Concurrent t g} a Concurrent.embed c _ = r = Concurrent.empty! Concurrent.fork 'let a = !c Concurrent.put a r Concurrent.take r Concurrent.run.pure : (forall t . '{Concurrent t g, g} a) ->{g} a Concurrent.run.pure t = Scope.run '(run.pure.impl (Concurrent.embed t)) Concurrent.run.pure.impl : (forall t . '{Concurrent t g} a) ->{Scope s, g} a Concurrent.run.pure.impl prog = q : Ref {Scope s} ['{Scope s, g} ()] q = Scope.ref [] enq : '{Scope s, g} () ->{Scope s} () enq t = Ref.write q (Ref.read q :+ t) isFull : PureMVar s g x ->{Scope s} Boolean isFull = cases PureMVar r _ -> match Ref.read r with Some _ -> true _ -> false set : Optional x -> PureMVar s g x ->{Scope s} () set o = cases PureMVar r cbs -> match Ref.read cbs with listening -> ws = List.map (w -> '(w o)) listening Ref.write q (Ref.read q ++ ws) Ref.write r o Ref.write cbs [] go : '{Scope s} Boolean -> (x ->{Scope s} ()) -> Request {Concurrent (PureMVar s g) g} x ->{Scope s, g} () go isDone putResult req39 = if !isDone then () else match req39 with { a } -> putResult a { Concurrent.fork a -> k } -> enq '(handle !a with go isDone (const ())) handle !k with go isDone putResult { Concurrent.forkInterruptible cancel a -> k } -> isDone' _ = !isDone || isFull cancel enq '(handle !a with go isDone' (const ())) handle !k with go isDone putResult { Concurrent.full a -> k } -> r = PureMVar (Scope.ref (Some a)) (Scope.ref []) handle k r with go isDone putResult { Concurrent.empty! -> k } -> r = PureMVar (Scope.ref None) (Scope.ref []) handle k r with go isDone putResult { Concurrent.isEmpty mv -> k} -> r = not (isFull mv) handle k r with go isDone putResult { Concurrent.put a v@(PureMVar r waiting) -> k } -> match Ref.read r with None -> match Ref.read waiting with [] -> Ref.write r (Some a) handle !k with go isDone putResult _ -> enq '(handle !k with go isDone putResult) set (Some a) v Some a0 -> w = Ref.read waiting Ref.write waiting (w :+ '(handle !k with go isDone putResult)) { Concurrent.tryPut a v@(PureMVar r _) -> k } -> match Ref.read r with None -> set (Some a) v handle k true with go isDone putResult Some _ -> handle k false with go isDone putResult { Concurrent.take v@(PureMVar r cbs) -> k } -> await : forall x2 . PureMVar s g x2 -> (x2 ->{Concurrent (PureMVar s g) g} x) -> Optional x2 ->{Scope s} () await _ k = cases None -> !step Some o -> enq '(handle k o with go isDone putResult) set None v step _ = match Ref.read r with None -> Ref.write cbs (Ref.read cbs :+ await v k) Some a0 -> Ref.write cbs (Ref.read cbs :+ await v k) set (Some a0) v !step { Concurrent.tryTake v@(PureMVar r cbs) -> k } -> match Ref.read r with None -> handle k None with go isDone putResult Some a -> set None v handle k (Some a) with go isDone putResult { Concurrent.read v@(PureMVar r cbs) -> k } -> await : forall x2 . PureMVar s g x2 -> (x2 ->{Concurrent (PureMVar s g) g} x) -> Optional x2 ->{Scope s} () await _ k = cases None -> !doIt -- go isDone putResult req Some o -> enq '(handle k o with go isDone putResult) doIt _ = match Ref.read r with None -> Ref.write cbs (Ref.read cbs :+ await v k) Some a0 -> Ref.write cbs (Ref.read cbs :+ await v k) set (Some a0) v !doIt { Concurrent.tryRead (PureMVar r _) -> k } -> a = Ref.read r handle k a with go isDone putResult finalResult = Scope.ref None isDone _ = isSome (Ref.read finalResult) drainQueue _ = if !isDone then () else match Ref.read q with [] -> () (h +: t) -> Ref.write q t !h !drainQueue handle !prog with go isDone (a -> Ref.write finalResult (Some a)) !drainQueue match Ref.read finalResult with None -> bug "deadlock detected" Some a -> a Concurrent.run : (forall t . '{Concurrent t {IO,Exception}, IO, Exception} a) ->{IO, Exception} a Concurrent.run a = run.impl (Concurrent.embed a) Concurrent.run.impl : (forall t . '{Concurrent t {IO,Exception}} a) ->{IO, Exception} a Concurrent.run.impl a = forkIO : '{IO,Exception} x ->{IO} ThreadId forkIO e = io.fork '(ignore (catch e)) go : Request {Concurrent MVar {IO,Exception}} x ->{IO, Exception} x go = cases { a } -> a { Concurrent.fork a -> k } -> tid = forkIO '(handle !a with go) handle !k with go { Concurrent.forkInterruptible signal a -> k } -> tid = forkIO '(handle !a with go) watcher = forkIO 'let done = MVar.read signal io.kill tid handle !k with go { Concurrent.full a -> k } -> r = MVar.new a handle k r with go { Concurrent.empty! -> k } -> r = !MVar.newEmpty handle k r with go { Concurrent.isEmpty mv -> k} -> r = MVar.isEmpty mv handle k r with go { Concurrent.put a mv -> k } -> r = MVar.put mv a handle k r with go { Concurrent.tryPut a mv -> k } -> r = MVar.tryPut mv a handle k r with go { Concurrent.take mv -> k } -> r = MVar.take mv handle k r with go { Concurrent.tryTake mv -> k } -> r = MVar.tryTake mv handle k r with go { Concurrent.read mv -> k } -> r = MVar.read mv handle k r with go { Concurrent.tryRead mv -> k } -> r = MVar.tryRead mv handle k r with go handle !a with go Async.toConcurrent : (forall t . '{Async t g} a) -> '{Concurrent t g, Exception} a Async.toConcurrent a _ = go : t (Either Failure ()) -> Request {Async (T t) g} x ->{Concurrent t g, Exception} x go parent = cases { a } -> Concurrent.put (Right ()) parent a {Async.fail e -> k} -> Concurrent.put (Right ()) parent raise e {Async.tryDetach a -> k} -> result = Concurrent.empty! Concurrent.forkInterruptible result 'let e = catch '(handle !a with go Concurrent.empty!) Concurrent.put e result handle k (Right (T false result)) with go parent {Async.tryComplete e (T readOnly t) -> k} -> if readOnly then handle k (Right ()) with go parent else r = if Concurrent.tryPut e t then Right () else Left alreadyCompleted handle k r with go parent {Async.tryAwait (T _ t) -> k} -> handle k (Concurrent.read t) with go parent {Async.current -> k} -> handle k (T true parent) with go parent {Async.tryIsComplete (T _ t) -> k} -> handle k (Right (not (Concurrent.isEmpty t))) with go parent {Async.empty! -> k} -> handle k (T false Concurrent.empty!) with go parent handle !a with go Concurrent.empty! -- idea: could you write a handler of `Concurrent` that checks a `t x` in -- between every operation? Yes, seems like it. --- {{ ``PureTask await complete isComplete`` }} unique type PureTask s g a = PureTask ('{Cooperative s g, Exception} a) (Either Failure a ->{Cooperative s g, g} ()) ('{Cooperative s g} Boolean) pure.run.impl : (∀ t. '{Async t g} a) ->{Cooperative s g, Exception, g} a pure.run.impl async = -- todo - think about handling of parent task go : Request {Async (PureTask s g) g} x ->{Cooperative s g, Exception, g} x go = cases { x } -> x { Async.fail e -> k } -> raise e { tryDetach a -> k } -> result = Cooperative.fork '(handle !a with go) -- suspend! handle k (Right result) with go handle !async with go {- up.async.Async.run : '{Async Async.Task {IO}} a ->{IO, Exception} a up.async.Async.run a = use io.MVar newEmpty go : MVar (Either Failure ()) -> Request {Async Async.Task {IO}} x ->{IO, Exception} x go parent = cases { a } -> io.MVar.tryPut parent !Right a {Async.fail e -> k} -> io.MVar.tryPut parent !Right raise e {tryDetach a -> k} -> result = !newEmpty tid = io.fork 'let e = catch '(handle !a with go !newEmpty) unsafeRun! '(io.MVar.tryPut result e) result' = fromMVar result |> onComplete '(io.kill tid) handle k (Right result') with go parent {Async.tryComplete e t -> k} -> handle k (catch '(Task.complete e t)) with go parent {Async.tryAwait t -> k} -> handle k (catch '(Task.await t)) with go parent {Async.tryParent! -> k} -> handle k (Right (fromMVarReadOnly parent)) with go parent {Async.tryIsComplete t -> k} -> handle k (catch '(Task.isComplete t)) with go parent {Async.empty! -> k} -> handle k !Task.empty with go parent handle !a with go !newEmpty -} Cooperative.await : PureMVar s g (Either Failure a) ->{Cooperative s g, Exception} a Cooperative.await mv = Exception.reraise (Cooperative.peek mv) Cooperative.run : '{Cooperative s g} a ->{Scope s, g} a Cooperative.run prog = q : Ref {Scope s} ['{Scope s, g} ()] q = Scope.ref [] go : '{Scope s} Boolean -> (x ->{Scope s} ()) -> Request {Cooperative s g} x -> {Scope s, g} () go isDone putResult req = if !isDone then () else match req with { x } -> putResult x { suspend! -> k } -> Ref.write q (Ref.read q :+ (_ -> handle !k with go isDone putResult)) { fork a -> k } -> result = !PureMVar.newEmpty t = PureTask '(Cooperative.await result) (e -> put e result) '(PureMVar.isEmpty result) handle k t with go isDone putResult { put a mv -> k } -> match impl.get mv with Empty [] -> impl.set (Full a []) mv handle !k with go isDone putResult Empty ((peek,cb) +: waiting) -> cb a impl.set (Empty waiting) match peek with Peek -> go isDone putResult req Take -> () Full a0 w -> impl.set (Full a (w :+ (a, _ -> handle !k with go isDone putResult))) mv finalResult = Scope.ref None isDone _ = isSome (Ref.read finalResult) drainQueue _ = if !isDone then () else match Ref.read q with [] -> () (h +: t) -> !h Ref.write q t !drainQueue handle !prog with go isDone (a -> Ref.write finalResult (Some a)) !drainQueue match Ref.read finalResult with None -> bug "deadlock detected" Some a -> a