Last active
September 9, 2025 06:51
-
-
Save kayceesrk/db4f7f58b10fddc223c9d83a20280fa4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type 'a state = Unfilled of task list | Filled of 'a | |
type 'a t = 'a state ref | |
let create () = ref (Unfilled []) | |
let rec try_fill promise value = | |
let old_state = !promise in | |
match old_state with | |
| Filled _ -> false | |
| Unfilled tasklist -> | |
promise := Filled value; | |
List.iter resume_task tasklist ; | |
true | |
let rec read promise = | |
match Atomic.get promise with | |
| Filled value -> value | |
| Unfilled tasklist -> | |
suspend_task (fun t -> promise := t::l) | |
(***************) | |
type t | |
val create : unit -> t | |
type _ Effect.t += Await : t -> (exn * Printexc.raw_backtrace) option Effect.t | |
val on_signal : t -> (t -> unit) -> bool | |
val signal : t -> bool | |
val is_signalled : t -> bool | |
(***************) | |
type 'a state = Unfilled of Trigger.t list | Filled of 'a | |
type 'a t = 'a state ref | |
let create () = ref (Unfilled []) | |
let rec try_fill promise value = | |
let old_state = !promise in | |
match old_state with | |
| Filled _ -> false | |
| Unfilled triglist -> | |
promise := Filled value; | |
List.iter Trigger.signal triglist ; | |
true | |
let rec read promise = | |
match Atomic.get promise with | |
| Filled value -> value | |
| Unfilled tasklist -> | |
let t = Trigger.create () in | |
promise := Unfilled (t::tasklist) ; | |
match Effect.perform (Await t) with | |
| None -> read promise | |
| _ -> failwith "TBD" | |
(*****************) | |
let run main = | |
... (* assume queue of continuations *) | |
let run_next () = | |
match dequeue () with | |
| Some k -> k () | |
| None -> () | |
in | |
let rec spawn f = | |
match f () with | |
| () -> run_next () (* value case *) | |
| effect Yield, k -> enqueue (continue k); run_next () | |
| effect (Fork f), k -> enqueue (continue k); spawn f | |
| effect Await t, k -> | |
let resume () = enqueue (continue k) in | |
Trigger.on_signal t resume |> ignore ; | |
run_next () | |
in | |
spawn main | |
(*****************) | |
type 'a state = Unfilled of Trigger.t list | Filled of 'a | |
type 'a t = 'a state Atomic.t | |
let create () = Atomic.make (Unfilled []) | |
let rec read promise = | |
match Atomic.get promise with | |
| Filled value -> value | |
| Unfilled l as before -> | |
let trigger = Trigger.create () in | |
let after = Unfilled (trigger :: l) in | |
if Atomic.compare_and_set promise before after then ( | |
match Effect.perform (Trigger.Await trigger) with | |
| None -> read promise | |
| _ -> failwith "TBD" | |
else read promise | |
(****************) | |
| effect Trigger.Await t, k -> | |
let resume () = enqueue (continue k) in | |
if Trigger.on_signal t resume then | |
(* successfully registered *) run_next () | |
else (* already signalled *) resume () | |
(**************) | |
Mutex.protect mutex (fun () -> (* lock *) | |
while condition_not_met () do | |
Condition.wait cond mutex (* unlock...lock *) | |
done) (* unlock *) | |
(**************) | |
finish (fun () -> | |
async (fun () -> ... ) ; (* T1 *) | |
async (fun () -> ... ) ; (* T2 *) | |
async (fun () -> | |
try | |
finish (fun () -> | |
async (fun () -> ... ) ; (* T3 *) | |
async (fun () -> | |
... | |
terminate(); (* cancel T3 if running *) | |
...) ; (* T4 *) | |
); (* wait for T3, T4 *) | |
with e -> ... (* handle exception from T3, T4 *) | |
); | |
); (* wait for T1, T2, and the async with T3, T4 *) | |
(**************) | |
type t | |
val create : unit -> t | |
(** [attach] and [detach] triggers *) | |
val attach : t -> Trigger.t -> bool | |
val detach : t -> Trigger.t -> bool | |
(** Signals all the attached triggers *) | |
val terminate : t -> (exn * Printexc.raw_backtrace) -> unit | |
val get_termination : t -> (exn * Printexc.raw_backtrace) option | |
(** Signals all the attached triggers *) | |
val forward : from_terminator:t -> to_terminator:t -> bool | |
(**************) | |
| effect (Trigger.Await t), k -> | |
(* Attach the trigger to the terminator when blocking *) | |
Terminator.attach terminator t |> ignore; | |
let resume () = | |
let open Effect.Deep in | |
(* Detach the trigger when waking up *) | |
Terminator.detach terminator t; | |
(* Check termination status *) | |
match Terminator.get_termination terminator with | |
| Some (exn, bt) -> (* Terminated *) | |
enqueue (fun () -> continue k (Some (exn, bt))) | |
| None -> (* Not terminated *) | |
enqueue (fun () -> continue k None) | |
in | |
if not (Trigger.on_signal t resume) then | |
(* already signalled *) resume (); | |
run_next () | |
(**************) | |
let wait condition mutex = | |
let trigger = Trigger.create () in | |
(* Add our trigger to the condition's waiters list *) | |
add_waiter trigger condition.waiters ; | |
(* Release mutex and wait *) | |
Mutex.unlock mutex; | |
(* Wait for the condition to be signaled *) | |
match Effect.perform (Trigger.Await trigger) with | |
| None -> (* Signaled normally *) | |
Mutex.lock mutex | |
| Some (exn, bt) -> (* Cancellation occurred *) | |
Mutex.lock mutex; | |
Printexc.raise_with_backtrace exn bt | |
(**************) | |
let unlock mutex = | |
let rec attempt () = | |
let before = Atomic.get mutex in | |
match before with | |
........ | |
| Locked { waiters = next_waiter :: remaining_waiters } -> | |
(* There are waiters, wake up one *) | |
let after = Locked { waiters = remaining_waiters } in | |
if Atomic.cas mutex before after then ( | |
(* Successfully updated state, now signal the next waiter *) | |
if Trigger.signal next_waiter then | |
(* Waiter signaled; owns lock *) () | |
else | |
(* Waiter already signaled; try again *) attempt ()) | |
else | |
(* CAS failed, try again *) attempt () | |
in | |
attempt () | |
(**************) | |
let shared_data = create () | |
(* Producer domain *) | |
let producer = Domain.spawn (fun () -> | |
Mux.Fifo.run (fun _ -> | |
print_endline "Producer: Started, going to sleep.."; | |
Unix.sleepf 1.0; | |
print_endline "Producer: Producing..."; | |
ignore (try_fill shared_data "Hello from producer!") | |
)) | |
(* Consumer domain *) | |
let consumer = Domain.spawn (fun () -> | |
Mux.Fifo.run (fun _ -> | |
print_endline "Consumer: Waiting..."; | |
let msg = read shared_data in | |
Printf.printf "Consumer: Got '%s'\n" msg | |
)) | |
let () = | |
Domain.join consumer; | |
Domain.join producer; | |
print_endline "✓ Successful!" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment