Last active
February 8, 2023 14:21
-
-
Save Nymphium/95d5f503165ab640df90ec4e3475fb9b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[@@@alert "-unstable"] | |
[@@@warning "-32"] | |
(* Reimplementation of Go's worker pools using Eio and Domainslib.Chan | |
https://gobyexample.com/worker-pools | |
*) | |
module Stdenv = struct | |
type _ Effect.t += Get : (Eio.Stdenv.t * Eio.Switch.t) Effect.t | |
let get () = Effect.perform Get | |
let run ~env ~sw f = | |
let effc (type a) (effc : a Effect.t) = | |
match effc with | |
| Get -> | |
Some | |
(fun (k : (a, _) Effect.Deep.continuation) -> Effect.Deep.continue k (env, sw)) | |
| _ -> None | |
in | |
Effect.Deep.try_with f () { effc } | |
;; | |
end | |
module Clock = struct | |
type _ Effect.t += Get : Eio.Time.clock Effect.t | |
let get () = Effect.perform Get | |
let sleep flt = Eio.Time.sleep (get ()) flt | |
let now () = Eio.Time.now (get ()) | |
let run (clock : Eio.Time.clock) f = | |
let effc (type a) (eff : a Effect.t) : ((a, 'r) Effect.Deep.continuation -> 'r) option | |
= | |
match eff with | |
| Get -> Some (fun k -> Effect.Deep.continue k clock) | |
| _ -> None | |
in | |
Effect.Deep.try_with f () { effc } | |
;; | |
end | |
module Chan_async = struct | |
module C = Domainslib.Chan | |
let make () = C.make_bounded 2 | |
let recv chan = | |
let rec go () = | |
match C.recv_poll chan with | |
| Some v -> v | |
| None -> | |
Clock.sleep 0.01; | |
Eio.Fiber.yield (); | |
go () | |
in | |
go () | |
;; | |
let send chan v = | |
let rec go () = | |
if C.send_poll chan v | |
then () | |
else ( | |
Clock.sleep 0.01; | |
Eio.Fiber.yield (); | |
go ()) | |
in | |
go () | |
;; | |
let drain chan = ignore @@ recv chan | |
let[@warning "-21"] recv_forever chan f = | |
let rec go () = | |
let v = recv chan in | |
ignore @@ f v; | |
go () | |
in | |
let env, sw = Stdenv.get () in | |
Eio.Fiber.fork_daemon ~sw | |
@@ fun () -> | |
Clock.run env#clock go; | |
`Stop_daemon | |
;; | |
module Syntax = struct | |
let ( <~ ) chan v = send chan v | |
let ( ~> ) chan = recv chan | |
let ( ~>! ) chan = drain chan | |
end | |
end | |
open Chan_async.Syntax | |
let worker id jobs results = | |
Chan_async.recv_forever jobs | |
@@ fun j -> | |
Printf.printf "worker %d started job %d\n" id j; | |
flush_all (); | |
Clock.sleep 1.; | |
Printf.printf "worker %d finished job %d\n" id j; | |
flush_all (); | |
results <~ j * 2 | |
;; | |
let main () = | |
let env, _ = Stdenv.get () in | |
let jobs = Chan_async.make () in | |
let results = Chan_async.make () in | |
let num_jobs = 3 in | |
let num_data = 5 in | |
Clock.run env#clock | |
@@ fun () -> | |
let start = Clock.now () in | |
for w = 1 to num_jobs do | |
worker w jobs results | |
done; | |
for j = 1 to num_data do | |
jobs <~ j | |
done; | |
for _ = 1 to num_data do | |
~>!results | |
done; | |
let end' = Clock.now () in | |
Printf.printf "%f sec\n" (end' -. start) | |
;; | |
let () = Eio_main.run @@ fun env -> Eio.Switch.run @@ fun sw -> Stdenv.run ~sw ~env main |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ dune exec src/main.exe | |
Done: 87% (7/8, 1 left) (jobs: 1)worker 1 started job 1 | |
worker 2 started job 2 | |
worker 3 started job 3 | |
worker 1 finished job 1 | |
worker 1 started job 4 | |
worker 2 finished job 2 | |
worker 2 started job 5 | |
worker 3 finished job 3 | |
worker 1 finished job 4 | |
worker 2 finished job 5 | |
2.022990 sec |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment