Skip to content

Instantly share code, notes, and snippets.

@kayceesrk
Last active September 9, 2025 06:51
Show Gist options
  • Save kayceesrk/db4f7f58b10fddc223c9d83a20280fa4 to your computer and use it in GitHub Desktop.
Save kayceesrk/db4f7f58b10fddc223c9d83a20280fa4 to your computer and use it in GitHub Desktop.
type 'a state = Unfilled of task list | Filled of 'a
type 'a t = 'a state ref
let create () = ref (Unfilled [])
let rec try_fill promise value =
let old_state = !promise in
match old_state with
| Filled _ -> false
| Unfilled tasklist ->
promise := Filled value;
List.iter resume_task tasklist ;
true
let rec read promise =
match Atomic.get promise with
| Filled value -> value
| Unfilled tasklist ->
suspend_task (fun t -> promise := t::l)
(***************)
type t
val create : unit -> t
type _ Effect.t += Await : t -> (exn * Printexc.raw_backtrace) option Effect.t
val on_signal : t -> (t -> unit) -> bool
val signal : t -> bool
val is_signalled : t -> bool
(***************)
type 'a state = Unfilled of Trigger.t list | Filled of 'a
type 'a t = 'a state ref
let create () = ref (Unfilled [])
let rec try_fill promise value =
let old_state = !promise in
match old_state with
| Filled _ -> false
| Unfilled triglist ->
promise := Filled value;
List.iter Trigger.signal triglist ;
true
let rec read promise =
match Atomic.get promise with
| Filled value -> value
| Unfilled tasklist ->
let t = Trigger.create () in
promise := Unfilled (t::tasklist) ;
match Effect.perform (Await t) with
| None -> read promise
| _ -> failwith "TBD"
(*****************)
let run main =
... (* assume queue of continuations *)
let run_next () =
match dequeue () with
| Some k -> k ()
| None -> ()
in
let rec spawn f =
match f () with
| () -> run_next () (* value case *)
| effect Yield, k -> enqueue (continue k); run_next ()
| effect (Fork f), k -> enqueue (continue k); spawn f
| effect Await t, k ->
let resume () = enqueue (continue k) in
Trigger.on_signal t resume |> ignore ;
run_next ()
in
spawn main
(*****************)
type 'a state = Unfilled of Trigger.t list | Filled of 'a
type 'a t = 'a state Atomic.t
let create () = Atomic.make (Unfilled [])
let rec read promise =
match Atomic.get promise with
| Filled value -> value
| Unfilled l as before ->
let trigger = Trigger.create () in
let after = Unfilled (trigger :: l) in
if Atomic.compare_and_set promise before after then (
match Effect.perform (Trigger.Await trigger) with
| None -> read promise
| _ -> failwith "TBD"
else read promise
(****************)
| effect Trigger.Await t, k ->
let resume () = enqueue (continue k) in
if Trigger.on_signal t resume then
(* successfully registered *) run_next ()
else (* already signalled *) resume ()
(**************)
Mutex.protect mutex (fun () -> (* lock *)
while condition_not_met () do
Condition.wait cond mutex (* unlock...lock *)
done) (* unlock *)
(**************)
finish (fun () ->
async (fun () -> ... ) ; (* T1 *)
async (fun () -> ... ) ; (* T2 *)
async (fun () ->
try
finish (fun () ->
async (fun () -> ... ) ; (* T3 *)
async (fun () ->
...
terminate(); (* cancel T3 if running *)
...) ; (* T4 *)
); (* wait for T3, T4 *)
with e -> ... (* handle exception from T3, T4 *)
);
); (* wait for T1, T2, and the async with T3, T4 *)
(**************)
type t
val create : unit -> t

(** [attach] and [detach] triggers *)
val attach : t -> Trigger.t -> bool
val detach : t -> Trigger.t -> bool
(** Signals all the attached triggers *)
val terminate : t -> (exn * Printexc.raw_backtrace) -> unit
val get_termination : t -> (exn * Printexc.raw_backtrace) option
(** Signals all the attached triggers *)
val forward : from_terminator:t -> to_terminator:t -> bool
(**************)
| effect (Trigger.Await t), k ->
(* Attach the trigger to the terminator when blocking *)
Terminator.attach terminator t |> ignore;
let resume () =
let open Effect.Deep in
(* Detach the trigger when waking up *)
Terminator.detach terminator t;
(* Check termination status *)
match Terminator.get_termination terminator with
| Some (exn, bt) -> (* Terminated *)
enqueue (fun () -> continue k (Some (exn, bt)))
| None -> (* Not terminated *)
enqueue (fun () -> continue k None)
in
if not (Trigger.on_signal t resume) then
(* already signalled *) resume ();
run_next ()
(**************)
let wait condition mutex =
let trigger = Trigger.create () in
(* Add our trigger to the condition's waiters list *)
add_waiter trigger condition.waiters ;
(* Release mutex and wait *)
Mutex.unlock mutex;
(* Wait for the condition to be signaled *)
match Effect.perform (Trigger.Await trigger) with
| None -> (* Signaled normally *)
Mutex.lock mutex
| Some (exn, bt) -> (* Cancellation occurred *)
Mutex.lock mutex;
Printexc.raise_with_backtrace exn bt
(**************)
let unlock mutex =
let rec attempt () =
let before = Atomic.get mutex in
match before with
........
| Locked { waiters = next_waiter :: remaining_waiters } ->
(* There are waiters, wake up one *)
let after = Locked { waiters = remaining_waiters } in
if Atomic.cas mutex before after then (
(* Successfully updated state, now signal the next waiter *)
if Trigger.signal next_waiter then
(* Waiter signaled; owns lock *) ()
else
(* Waiter already signaled; try again *) attempt ())
else
(* CAS failed, try again *) attempt ()
in
attempt ()
(**************)
let shared_data = create ()
(* Producer domain *)
let producer = Domain.spawn (fun () ->
Mux.Fifo.run (fun _ ->
print_endline "Producer: Started, going to sleep..";
Unix.sleepf 1.0;
print_endline "Producer: Producing...";
ignore (try_fill shared_data "Hello from producer!")
))
(* Consumer domain *)
let consumer = Domain.spawn (fun () ->
Mux.Fifo.run (fun _ ->
print_endline "Consumer: Waiting...";
let msg = read shared_data in
Printf.printf "Consumer: Got '%s'\n" msg
))
let () =
Domain.join consumer;
Domain.join producer;
print_endline "✓ Successful!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment