Last active
October 19, 2021 01:58
-
-
Save pchiusano/f5d899a59b6a1c3de1f7518f7ad89f2b to your computer and use it in GitHub Desktop.
Cooperative threading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
> Async.pure.run 'let | |
List.parMap (x -> x*10) [1,2,3,4,5] | |
> Async.pure.run 'let | |
use Async put fork await empty! | |
tr = empty! | |
t1 = fork '(await tr + 1) | |
t2 = fork '(put 9 tr) | |
Async.await t1 | |
> List.map (x -> x * 100) [1,2,3,4,5] | |
Async.put : a -> t a -> () | |
Async.put a t = Async.complete (Right a) t | |
use .base.M2h Scope | |
unique type PureMVar s g a | |
= PureMVar (Ref {Scope s} (Optional a)) | |
(Ref {Scope s} [Optional a ->{Scope s, g} ()]) | |
unique ability Concurrent t g where | |
empty! : t a | |
full : a -> t a | |
put : a -> t a -> () | |
tryPut : a -> t a -> Boolean | |
take : t a -> a | |
tryTake : t a -> Optional a | |
read : t a -> a | |
tryRead : t a -> Optional a | |
isEmpty : t a -> Boolean | |
fork : '{g, Concurrent t g} () -> () | |
forkInterruptible : t x -> '{g, Concurrent t g} () -> () | |
sleepMicroseconds : Nat -> () | |
unique type Concurrent.T t a = T Boolean (t (Either Failure a)) | |
Async.embed : '{g, Async t g} a -> '{Async t g} a | |
Async.embed a _ = Async.await (Async.fork a) | |
Async.pure.run : (forall t . '{Async t g, g} a) ->{g} Either Failure a | |
Async.pure.run a = | |
Concurrent.run.pure '(catch <| Async.toConcurrent (Async.embed a)) | |
Concurrent.tryRun : (forall t . '{IO,Exception,Concurrent t {IO,Exception}} a) ->{IO} Either Failure a | |
Concurrent.tryRun a = | |
catch '(run.impl (Concurrent.embed a)) | |
Concurrent.embed : '{g, Concurrent t g} a -> '{Concurrent t g} a | |
Concurrent.embed c _ = | |
r = Concurrent.empty! | |
Concurrent.fork 'let | |
a = !c | |
Concurrent.put a r | |
Concurrent.take r | |
Concurrent.run.pure : (forall t . '{Concurrent t g, g} a) ->{g} a | |
Concurrent.run.pure t = Scope.run '(run.pure.impl (Concurrent.embed t)) | |
Concurrent.run.pure.impl : (forall t . '{Concurrent t g} a) ->{Scope s, g} a | |
Concurrent.run.pure.impl prog = | |
q : Ref {Scope s} ['{Scope s, g} ()] | |
q = Scope.ref [] | |
enq : '{Scope s, g} () ->{Scope s} () | |
enq t = | |
Ref.write q (Ref.read q :+ t) | |
isFull : PureMVar s g x ->{Scope s} Boolean | |
isFull = cases PureMVar r _ -> match Ref.read r with | |
Some _ -> true | |
_ -> false | |
set : Optional x -> PureMVar s g x ->{Scope s} () | |
set o = cases PureMVar r cbs -> match Ref.read cbs with | |
listening -> | |
ws = List.map (w -> '(w o)) listening | |
Ref.write q (Ref.read q ++ ws) | |
Ref.write r o | |
Ref.write cbs [] | |
go : '{Scope s} Boolean -> (x ->{Scope s} ()) | |
-> Request {Concurrent (PureMVar s g) g} x ->{Scope s, g} () | |
go isDone putResult req39 = | |
if !isDone then () | |
else match req39 with | |
{ a } -> putResult a | |
{ Concurrent.fork a -> k } -> | |
enq '(handle !a with go isDone (const ())) | |
handle !k with go isDone putResult | |
{ Concurrent.forkInterruptible cancel a -> k } -> | |
isDone' _ = !isDone || isFull cancel | |
enq '(handle !a with go isDone' (const ())) | |
handle !k with go isDone putResult | |
{ Concurrent.full a -> k } -> | |
r = PureMVar (Scope.ref (Some a)) (Scope.ref []) | |
handle k r with go isDone putResult | |
{ Concurrent.empty! -> k } -> | |
r = PureMVar (Scope.ref None) (Scope.ref []) | |
handle k r with go isDone putResult | |
{ Concurrent.isEmpty mv -> k} -> | |
r = not (isFull mv) | |
handle k r with go isDone putResult | |
{ Concurrent.put a v@(PureMVar r waiting) -> k } -> | |
match Ref.read r with | |
None -> match Ref.read waiting with | |
[] -> | |
Ref.write r (Some a) | |
handle !k with go isDone putResult | |
_ -> | |
enq '(handle !k with go isDone putResult) | |
set (Some a) v | |
Some a0 -> | |
w = Ref.read waiting | |
Ref.write waiting (w :+ '(handle !k with go isDone putResult)) | |
{ Concurrent.tryPut a v@(PureMVar r _) -> k } -> match Ref.read r with | |
None -> | |
set (Some a) v | |
handle k true with go isDone putResult | |
Some _ -> | |
handle k false with go isDone putResult | |
{ Concurrent.take v@(PureMVar r cbs) -> k } -> | |
await : forall x2 . PureMVar s g x2 | |
-> (x2 ->{Concurrent (PureMVar s g) g} x) | |
-> Optional x2 ->{Scope s} () | |
await _ k = cases | |
None -> !step | |
Some o -> | |
enq '(handle k o with go isDone putResult) | |
set None v | |
step _ = match Ref.read r with | |
None -> | |
Ref.write cbs (Ref.read cbs :+ await v k) | |
Some a0 -> | |
Ref.write cbs (Ref.read cbs :+ await v k) | |
set (Some a0) v | |
!step | |
{ Concurrent.tryTake v@(PureMVar r cbs) -> k } -> | |
match Ref.read r with | |
None -> handle k None with go isDone putResult | |
Some a -> | |
set None v | |
handle k (Some a) with go isDone putResult | |
{ Concurrent.read v@(PureMVar r cbs) -> k } -> | |
await : forall x2 . PureMVar s g x2 | |
-> (x2 ->{Concurrent (PureMVar s g) g} x) | |
-> Optional x2 ->{Scope s} () | |
await _ k = cases | |
None -> !doIt -- go isDone putResult req | |
Some o -> enq '(handle k o with go isDone putResult) | |
doIt _ = match Ref.read r with | |
None -> Ref.write cbs (Ref.read cbs :+ await v k) | |
Some a0 -> | |
Ref.write cbs (Ref.read cbs :+ await v k) | |
set (Some a0) v | |
!doIt | |
{ Concurrent.tryRead (PureMVar r _) -> k } -> | |
a = Ref.read r | |
handle k a with go isDone putResult | |
finalResult = Scope.ref None | |
isDone _ = isSome (Ref.read finalResult) | |
drainQueue _ = | |
if !isDone then () | |
else match Ref.read q with | |
[] -> () | |
(h +: t) -> | |
Ref.write q t | |
!h | |
!drainQueue | |
handle !prog with go isDone (a -> Ref.write finalResult (Some a)) | |
!drainQueue | |
match Ref.read finalResult with | |
None -> bug "deadlock detected" | |
Some a -> a | |
Concurrent.run : (forall t . '{Concurrent t {IO,Exception}, IO, Exception} a) ->{IO, Exception} a | |
Concurrent.run a = run.impl (Concurrent.embed a) | |
Concurrent.run.impl : (forall t . '{Concurrent t {IO,Exception}} a) ->{IO, Exception} a | |
Concurrent.run.impl a = | |
forkIO : '{IO,Exception} x ->{IO} ThreadId | |
forkIO e = io.fork '(ignore (catch e)) | |
go : Request {Concurrent MVar {IO,Exception}} x ->{IO, Exception} x | |
go = cases | |
{ a } -> a | |
{ Concurrent.fork a -> k } -> | |
tid = forkIO '(handle !a with go) | |
handle !k with go | |
{ Concurrent.forkInterruptible signal a -> k } -> | |
tid = forkIO '(handle !a with go) | |
watcher = forkIO 'let | |
done = MVar.read signal | |
io.kill tid | |
handle !k with go | |
{ Concurrent.full a -> k } -> | |
r = MVar.new a | |
handle k r with go | |
{ Concurrent.empty! -> k } -> | |
r = !MVar.newEmpty | |
handle k r with go | |
{ Concurrent.isEmpty mv -> k} -> | |
r = MVar.isEmpty mv | |
handle k r with go | |
{ Concurrent.put a mv -> k } -> | |
r = MVar.put mv a | |
handle k r with go | |
{ Concurrent.tryPut a mv -> k } -> | |
r = MVar.tryPut mv a | |
handle k r with go | |
{ Concurrent.take mv -> k } -> | |
r = MVar.take mv | |
handle k r with go | |
{ Concurrent.tryTake mv -> k } -> | |
r = MVar.tryTake mv | |
handle k r with go | |
{ Concurrent.read mv -> k } -> | |
r = MVar.read mv | |
handle k r with go | |
{ Concurrent.tryRead mv -> k } -> | |
r = MVar.tryRead mv | |
handle k r with go | |
handle !a with go | |
Async.toConcurrent : (forall t . '{Async t g} a) -> '{Concurrent t g, Exception} a | |
Async.toConcurrent a _ = | |
go : t (Either Failure ()) -> Request {Async (T t) g} x ->{Concurrent t g, Exception} x | |
go parent = cases | |
{ a } -> | |
Concurrent.put (Right ()) parent | |
a | |
{Async.fail e -> k} -> | |
Concurrent.put (Right ()) parent | |
raise e | |
{Async.tryDetach a -> k} -> | |
result = Concurrent.empty! | |
Concurrent.forkInterruptible result 'let | |
e = catch '(handle !a with go Concurrent.empty!) | |
Concurrent.put e result | |
handle k (Right (T false result)) with go parent | |
{Async.tryComplete e (T readOnly t) -> k} -> | |
if readOnly then handle k (Right ()) with go parent | |
else | |
r = if Concurrent.tryPut e t then Right () | |
else Left alreadyCompleted | |
handle k r with go parent | |
{Async.tryAwait (T _ t) -> k} -> handle k (Concurrent.read t) with go parent | |
{Async.current -> k} -> handle k (T true parent) with go parent | |
{Async.tryIsComplete (T _ t) -> k} -> handle k (Right (not (Concurrent.isEmpty t))) with go parent | |
{Async.empty! -> k} -> handle k (T false Concurrent.empty!) with go parent | |
handle !a with go Concurrent.empty! | |
-- idea: could you write a handler of `Concurrent` that checks a `t x` in | |
-- between every operation? Yes, seems like it. | |
--- | |
{{ ``PureTask await complete isComplete`` }} | |
unique type PureTask s g a | |
= PureTask ('{Cooperative s g, Exception} a) | |
(Either Failure a ->{Cooperative s g, g} ()) | |
('{Cooperative s g} Boolean) | |
pure.run.impl : (∀ t. '{Async t g} a) ->{Cooperative s g, Exception, g} a | |
pure.run.impl async = | |
-- todo - think about handling of parent task | |
go : Request {Async (PureTask s g) g} x ->{Cooperative s g, Exception, g} x | |
go = cases | |
{ x } -> x | |
{ Async.fail e -> k } -> raise e | |
{ tryDetach a -> k } -> | |
result = Cooperative.fork '(handle !a with go) | |
-- suspend! | |
handle k (Right result) with go | |
handle !async with go | |
{- | |
up.async.Async.run : '{Async Async.Task {IO}} a ->{IO, Exception} a | |
up.async.Async.run a = | |
use io.MVar newEmpty | |
go : MVar (Either Failure ()) -> Request {Async Async.Task {IO}} x ->{IO, Exception} x | |
go parent = cases | |
{ a } -> | |
io.MVar.tryPut parent !Right | |
a | |
{Async.fail e -> k} -> | |
io.MVar.tryPut parent !Right | |
raise e | |
{tryDetach a -> k} -> | |
result = !newEmpty | |
tid = | |
io.fork | |
'let | |
e = catch '(handle !a with go !newEmpty) | |
unsafeRun! '(io.MVar.tryPut result e) | |
result' = fromMVar result |> onComplete '(io.kill tid) | |
handle k (Right result') with go parent | |
{Async.tryComplete e t -> k} -> handle k (catch '(Task.complete e t)) with go parent | |
{Async.tryAwait t -> k} -> handle k (catch '(Task.await t)) with go parent | |
{Async.tryParent! -> k} -> handle k (Right (fromMVarReadOnly parent)) with go parent | |
{Async.tryIsComplete t -> k} -> handle k (catch '(Task.isComplete t)) with go parent | |
{Async.empty! -> k} -> handle k !Task.empty with go parent | |
handle !a with go !newEmpty | |
-} | |
Cooperative.await : PureMVar s g (Either Failure a) ->{Cooperative s g, Exception} a | |
Cooperative.await mv = Exception.reraise (Cooperative.peek mv) | |
Cooperative.run : '{Cooperative s g} a ->{Scope s, g} a | |
Cooperative.run prog = | |
q : Ref {Scope s} ['{Scope s, g} ()] | |
q = Scope.ref [] | |
go : '{Scope s} Boolean | |
-> (x ->{Scope s} ()) | |
-> Request {Cooperative s g} x | |
-> {Scope s, g} () | |
go isDone putResult req = | |
if !isDone then () | |
else match req with | |
{ x } -> putResult x | |
{ suspend! -> k } -> Ref.write q (Ref.read q :+ (_ -> handle !k with go isDone putResult)) | |
{ fork a -> k } -> | |
result = !PureMVar.newEmpty | |
t = PureTask '(Cooperative.await result) (e -> put e result) '(PureMVar.isEmpty result) | |
handle k t with go isDone putResult | |
{ put a mv -> k } -> match impl.get mv with | |
Empty [] -> | |
impl.set (Full a []) mv | |
handle !k with go isDone putResult | |
Empty ((peek,cb) +: waiting) -> | |
cb a | |
impl.set (Empty waiting) | |
match peek with | |
Peek -> go isDone putResult req | |
Take -> () | |
Full a0 w -> | |
impl.set (Full a (w :+ (a, _ -> handle !k with go isDone putResult))) mv | |
finalResult = Scope.ref None | |
isDone _ = isSome (Ref.read finalResult) | |
drainQueue _ = | |
if !isDone then () | |
else match Ref.read q with | |
[] -> () | |
(h +: t) -> !h | |
Ref.write q t | |
!drainQueue | |
handle !prog with go isDone (a -> Ref.write finalResult (Some a)) | |
!drainQueue | |
match Ref.read finalResult with | |
None -> bug "deadlock detected" | |
Some a -> a |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
> Async.pure.run 'let | |
List.parMap (x -> x*10) [1,2,3,4,5] | |
> Async.pure.run 'let | |
use Async put fork await empty! | |
tr = empty! | |
t1 = fork '(await tr + 1) | |
t2 = fork '(put 9 tr) | |
Async.await t1 | |
> List.map (x -> x * 100) [1,2,3,4,5] | |
use .base.M2h Scope | |
unique type PureMVar s g a | |
= PureMVar (Ref {Scope s} (Optional a, [Optional a ->{Scope s, g} ()])) | |
unique ability Concurrent t g where | |
empty! : t a | |
full : a -> t a | |
put : a -> t a -> () | |
tryPut : a -> t a -> Boolean | |
take : t a -> a | |
tryTake : t a -> Optional a | |
read : t a -> a | |
tryRead : t a -> Optional a | |
isEmpty : t a -> Boolean | |
fork : '{g, Concurrent t g} () -> () | |
forkInterruptible : t x -> '{g, Concurrent t g} () -> () | |
-- | |
unique type Concurrent.T t a = T Boolean (t (Either Failure a)) | |
Async.pure.run : (forall t . '{Async t g, g} a) ->{g} Either Failure a | |
Async.pure.run a = | |
Concurrent.run.pure '(catch <| Async.toConcurrent (Async.embed a)) | |
Concurrent.tryRun : (forall t . '{IO,Exception,Concurrent t {IO,Exception}} a) ->{IO} Either Failure a | |
Concurrent.tryRun a = | |
catch '(run.impl (Concurrent.embed a)) | |
Concurrent.embed : '{g, Concurrent t g} a -> '{Concurrent t g} a | |
Concurrent.embed c _ = | |
r = Concurrent.empty! | |
Concurrent.fork 'let | |
a = !c | |
Concurrent.put a r | |
Concurrent.take r | |
Concurrent.run.pure : (forall t . '{Concurrent t g, g} a) ->{g} a | |
Concurrent.run.pure t = Scope.run '(run.pure.impl (embed t)) | |
Concurrent.run.pure.impl : (forall t . '{Concurrent t g} a) ->{Scope s, g} a | |
Concurrent.run.pure.impl prog = | |
q : Ref {Scope s} ['{Scope s, g} ()] | |
q = Scope.ref [] | |
enq : '{Scope s, g} () ->{Scope s} () | |
enq t = | |
Ref.write q (Ref.read q :+ t) | |
isFull : PureMVar s g x ->{Scope s} Boolean | |
isFull = cases PureMVar r -> match Ref.read r with | |
(Some _, _) -> true | |
_ -> false | |
set : Optional x -> PureMVar s g x ->{Scope s} () | |
set o = cases PureMVar r -> match Ref.read r with | |
(_, listening) -> | |
ws = (List.map (w -> '(w o)) listening) | |
Ref.write q (Ref.read q ++ ws) | |
Ref.write r (o, []) | |
go : '{Scope s} Boolean -> (x ->{Scope s} ()) | |
-> Request {Concurrent (PureMVar s g) g} x ->{Scope s, g} () | |
go isDone putResult req39 = | |
if !isDone then () | |
else match req39 with | |
{ a } -> putResult a | |
{ Concurrent.fork a -> k } -> | |
enq '(handle !a with go isDone (const ())) | |
handle !k with go isDone putResult | |
{ Concurrent.forkInterruptible cancel a -> k } -> | |
isDone' _ = !isDone || isFull cancel | |
enq '(handle !a with go isDone' (const ())) | |
handle !k with go isDone putResult | |
{ Concurrent.full a -> k } -> | |
r = PureMVar (Scope.ref (Some a, [])) | |
handle k r with go isDone putResult | |
{ Concurrent.empty! -> k } -> | |
r = PureMVar (Scope.ref (None, [])) | |
handle k r with go isDone putResult | |
{ Concurrent.isEmpty mv -> k} -> | |
r = not (isFull mv) | |
handle k r with go isDone putResult | |
{ Concurrent.put a v@(PureMVar mv) -> k } -> | |
match Ref.read mv with | |
(None, []) -> | |
Ref.write mv (Some a, []) | |
handle !k with go isDone putResult | |
(None, _) -> | |
enq '(handle !k with go isDone putResult) | |
set (Some a) v | |
(Some a0, w) -> | |
Ref.write mv (Some a0, w :+ '(handle !k with go isDone putResult)) | |
{ Concurrent.tryPut a v@(PureMVar mv) -> k } -> match Ref.read mv with | |
(None, ls) -> | |
set (Some a) v | |
handle k true with go isDone putResult | |
(Some _, ls) -> | |
handle k false with go isDone putResult | |
{ Concurrent.take v@(PureMVar mv) -> k } -> | |
await : forall x2 . PureMVar s g x2 | |
-> (x2 ->{Concurrent (PureMVar s g) g} x) | |
-> Optional x2 ->{Scope s} () | |
await _ k = cases | |
None -> !doIt -- go isDone putResult req | |
Some o -> enq '(handle k o with go isDone putResult) | |
doIt _ = match Ref.read mv with | |
(None, ws) -> | |
Ref.write mv (None, ws :+ await v k) | |
(Some a0, ws) -> | |
Ref.write mv (Some a0, ws :+ await v k) | |
set None v | |
!doIt | |
{ Concurrent.tryTake v@(PureMVar mv) -> k } -> | |
match Ref.read mv with | |
(None, ws) -> handle k None with go isDone putResult | |
(Some a, ws) -> | |
set None v | |
handle k (Some a) with go isDone putResult | |
{ Concurrent.read v@(PureMVar mv) -> k } -> | |
await : forall x2 . PureMVar s g x2 | |
-> (x2 ->{Concurrent (PureMVar s g) g} x) | |
-> Optional x2 ->{Scope s} () | |
await _ k = cases | |
None -> !doIt -- go isDone putResult req | |
Some o -> enq '(handle k o with go isDone putResult) | |
doIt _ = match Ref.read mv with | |
(None, ws) -> Ref.write mv (None, ws :+ await v k) | |
(Some a0, ws) -> | |
Ref.write mv (Some a0, ws :+ await v k) | |
set (Some a0) v | |
!doIt | |
{ Concurrent.tryRead v@(PureMVar mv) -> k } -> | |
match Ref.read mv with (a, ws) -> handle k a with go isDone putResult | |
finalResult = Scope.ref None | |
isDone _ = isSome (Ref.read finalResult) | |
drainQueue _ = | |
if !isDone then () | |
else match Ref.read q with | |
[] -> () | |
(h +: t) -> | |
Ref.write q t | |
!h | |
!drainQueue | |
handle !prog with go isDone (a -> Ref.write finalResult (Some a)) | |
!drainQueue | |
match Ref.read finalResult with | |
None -> bug "deadlock detected" | |
Some a -> a | |
Concurrent.run : (forall t . '{Concurrent t {IO,Exception}, IO, Exception} a) ->{IO, Exception} a | |
Concurrent.run a = run.impl (Concurrent.embed a) | |
Concurrent.run.impl : (forall t . '{Concurrent t {IO,Exception}} a) ->{IO, Exception} a | |
Concurrent.run.impl a = | |
forkIO : '{IO,Exception} x ->{IO} ThreadId | |
forkIO e = io.fork '(ignore (catch e)) | |
go : Request {Concurrent MVar {IO,Exception}} x ->{IO, Exception} x | |
go = cases | |
{ a } -> a | |
{ Concurrent.fork a -> k } -> | |
tid = forkIO '(handle !a with go) | |
handle !k with go | |
{ Concurrent.forkInterruptible signal a -> k } -> | |
tid = forkIO '(handle !a with go) | |
watcher = forkIO 'let | |
done = MVar.read signal | |
io.kill tid | |
handle !k with go | |
{ Concurrent.full a -> k } -> | |
r = MVar.new a | |
handle k r with go | |
{ Concurrent.empty! -> k } -> | |
r = !MVar.newEmpty | |
handle k r with go | |
{ Concurrent.isEmpty mv -> k} -> | |
r = MVar.isEmpty mv | |
handle k r with go | |
{ Concurrent.put a mv -> k } -> | |
r = MVar.put mv a | |
handle k r with go | |
{ Concurrent.tryPut a mv -> k } -> | |
r = MVar.tryPut mv a | |
handle k r with go | |
{ Concurrent.take mv -> k } -> | |
r = MVar.take mv | |
handle k r with go | |
{ Concurrent.tryTake mv -> k } -> | |
r = MVar.tryTake mv | |
handle k r with go | |
{ Concurrent.read mv -> k } -> | |
r = MVar.read mv | |
handle k r with go | |
{ Concurrent.tryRead mv -> k } -> | |
r = MVar.tryRead mv | |
handle k r with go | |
handle !a with go | |
Async.toConcurrent : (forall t . '{Async t g} a) -> '{Concurrent t g, Exception} a | |
Async.toConcurrent a _ = | |
go : t (Either Failure ()) -> Request {Async (T t) g} x ->{Concurrent t g, Exception} x | |
go parent = cases | |
{ a } -> | |
Concurrent.put (Right ()) parent | |
a | |
{Async.fail e -> k} -> | |
Concurrent.put (Right ()) parent | |
raise e | |
{Async.tryDetach a -> k} -> | |
result = Concurrent.empty! | |
Concurrent.forkInterruptible result 'let | |
e = catch '(handle !a with go Concurrent.empty!) | |
Concurrent.put e result | |
handle k (Right (T false result)) with go parent | |
{Async.tryComplete e (T readOnly t) -> k} -> | |
if readOnly then handle k (Right ()) with go parent | |
else | |
r = if Concurrent.tryPut e t then Right () | |
else Left alreadyCompleted | |
handle k r with go parent | |
{Async.tryAwait (T _ t) -> k} -> handle k (Concurrent.read t) with go parent | |
{Async.tryParent! -> k} -> handle k (Right (T true parent)) with go parent | |
{Async.tryIsComplete (T _ t) -> k} -> handle k (Right (not (Concurrent.isEmpty t))) with go parent | |
{Async.empty! -> k} -> handle k (T false Concurrent.empty!) with go parent | |
handle !a with go Concurrent.empty! | |
-- idea: could you write a handler of `Concurrent` that checks a `t x` in | |
-- between every operation? Yes, seems like it. | |
--- | |
{{ ``PureTask await complete isComplete`` }} | |
unique type PureTask s g a | |
= PureTask ('{Cooperative s g, Exception} a) | |
(Either Failure a ->{Cooperative s g, g} ()) | |
('{Cooperative s g} Boolean) | |
pure.run.impl : (∀ t. '{Async t g} a) ->{Cooperative s g, Exception, g} a | |
pure.run.impl async = | |
-- todo - think about handling of parent task | |
go : Request {Async (PureTask s g) g} x ->{Cooperative s g, Exception, g} x | |
go = cases | |
{ x } -> x | |
{ Async.fail e -> k } -> raise e | |
{ tryDetach a -> k } -> | |
result = Cooperative.fork '(handle !a with go) | |
-- suspend! | |
handle k (Right result) with go | |
handle !async with go | |
{- | |
up.async.Async.run : '{Async Async.Task {IO}} a ->{IO, Exception} a | |
up.async.Async.run a = | |
use io.MVar newEmpty | |
go : MVar (Either Failure ()) -> Request {Async Async.Task {IO}} x ->{IO, Exception} x | |
go parent = cases | |
{ a } -> | |
io.MVar.tryPut parent !Right | |
a | |
{Async.fail e -> k} -> | |
io.MVar.tryPut parent !Right | |
raise e | |
{tryDetach a -> k} -> | |
result = !newEmpty | |
tid = | |
io.fork | |
'let | |
e = catch '(handle !a with go !newEmpty) | |
unsafeRun! '(io.MVar.tryPut result e) | |
result' = fromMVar result |> onComplete '(io.kill tid) | |
handle k (Right result') with go parent | |
{Async.tryComplete e t -> k} -> handle k (catch '(Task.complete e t)) with go parent | |
{Async.tryAwait t -> k} -> handle k (catch '(Task.await t)) with go parent | |
{Async.tryParent! -> k} -> handle k (Right (fromMVarReadOnly parent)) with go parent | |
{Async.tryIsComplete t -> k} -> handle k (catch '(Task.isComplete t)) with go parent | |
{Async.empty! -> k} -> handle k !Task.empty with go parent | |
handle !a with go !newEmpty | |
-} | |
Cooperative.await : PureMVar s g (Either Failure a) ->{Cooperative s g, Exception} a | |
Cooperative.await mv = Exception.reraise (Cooperative.peek mv) | |
Cooperative.run : '{Cooperative s g} a ->{Scope s, g} a | |
Cooperative.run prog = | |
q : Ref {Scope s} ['{Scope s, g} ()] | |
q = Scope.ref [] | |
go : '{Scope s} Boolean | |
-> (x ->{Scope s} ()) | |
-> Request {Cooperative s g} x | |
-> {Scope s, g} () | |
go isDone putResult req = | |
if !isDone then () | |
else match req with | |
{ x } -> putResult x | |
{ suspend! -> k } -> Ref.write q (Ref.read q :+ (_ -> handle !k with go isDone putResult)) | |
{ fork a -> k } -> | |
result = !PureMVar.newEmpty | |
t = PureTask '(Cooperative.await result) (e -> put e result) '(PureMVar.isEmpty result) | |
handle k t with go isDone putResult | |
{ put a mv -> k } -> match impl.get mv with | |
Empty [] -> | |
impl.set (Full a []) mv | |
handle !k with go isDone putResult | |
Empty ((peek,cb) +: waiting) -> | |
cb a | |
impl.set (Empty waiting) | |
match peek with | |
Peek -> go isDone putResult req | |
Take -> () | |
Full a0 w -> | |
impl.set (Full a (w :+ (a, _ -> handle !k with go isDone putResult))) mv | |
finalResult = Scope.ref None | |
isDone _ = isSome (Ref.read finalResult) | |
drainQueue _ = | |
if !isDone then () | |
else match Ref.read q with | |
[] -> () | |
(h +: t) -> !h | |
Ref.write q t | |
!drainQueue | |
handle !prog with go isDone (a -> Ref.write finalResult (Some a)) | |
!drainQueue | |
match Ref.read finalResult with | |
None -> bug "deadlock detected" | |
Some a -> a |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment