Created
January 25, 2021 19:42
-
-
Save EduardoRFS/0000a193a5a7b964651ef532b412ce5f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Lwt_engine; | |
[@ocaml.warning "-3"]; | |
module Lwt_sequence = Lwt_sequence; | |
[@ocaml.warning "+3"]; | |
module Fd_map = | |
Map.Make({ | |
type t = Unix.file_descr; | |
let compare = compare; | |
}); | |
/* Type of a sleeper for the select engine. */ | |
type sleeper = { | |
mutable time: float, | |
/* The time at which the sleeper should be wakeup. */ | |
mutable stopped: bool, | |
/* [true] iff the event has been stopped. */ | |
action: unit => unit, | |
/* The action for the sleeper. */ | |
}; | |
[@ocaml.warning "-3"] | |
module Sleep_queue = | |
Lwt_pqueue.Make({ | |
type t = sleeper; | |
let compare = ({time: t1, _}, {time: t2, _}) => compare(t1, t2); | |
}); | |
let rec get_next_timeout = sleep_queue => | |
switch (Sleep_queue.lookup_min(sleep_queue)) { | |
| Some({stopped: true, _}) => | |
get_next_timeout(Sleep_queue.remove_min(sleep_queue)) | |
| Some({time, _}) => max(0., time -. Unix.gettimeofday()) | |
| None => (-1.) | |
}; | |
let bad_fd = fd => | |
try({ | |
let _ = Unix.fstat(fd); | |
false; | |
}) { | |
| Unix.Unix_error(_, _, _) => true | |
}; | |
let rec restart_actions = (sleep_queue, now) => | |
switch (Sleep_queue.lookup_min(sleep_queue)) { | |
| Some({stopped: true, _}) => | |
restart_actions(Sleep_queue.remove_min(sleep_queue), now) | |
| Some({time, action, _}) when time <= now => | |
/* We have to remove the sleeper to the queue before performing | |
the action. The action can change the sleeper's time, and this | |
might break the priority queue invariant if the sleeper is | |
still in the queue. */ | |
let q = Sleep_queue.remove_min(sleep_queue); | |
action(); | |
restart_actions(q, now); | |
| _ => sleep_queue | |
}; | |
let invoke_actions = (fd, map) => | |
switch ( | |
try(Some(Fd_map.find(fd, map))) { | |
| Not_found => None | |
} | |
) { | |
| Some(actions) => Lwt_sequence.iter_l(f => f(), actions) | |
| None => () | |
}; | |
class virtual select_or_poll_based = { | |
as _; | |
inherit class abstract; | |
val mutable sleep_queue = Sleep_queue.empty; | |
/* Threads waiting for a timeout to expire. */ | |
val mutable new_sleeps = []; | |
/* Sleepers added since the last iteration of the main loop: | |
They are not added immediately to the main sleep queue in order | |
to prevent them from being wakeup immediately. */ | |
val mutable wait_readable = Fd_map.empty; | |
/* Sequences of actions waiting for file descriptors to become | |
readable. */ | |
val mutable wait_writable = Fd_map.empty; | |
/* Sequences of actions waiting for file descriptors to become | |
writable. */ | |
pri cleanup = (); | |
pri register_timer = (delay, repeat, f) => | |
if (repeat) { | |
let rec sleeper = { | |
time: Unix.gettimeofday() +. delay, | |
stopped: false, | |
action: g, | |
} | |
and g = () => { | |
sleeper.time = Unix.gettimeofday() +. delay; | |
new_sleeps = [sleeper, ...new_sleeps]; | |
f(); | |
}; | |
new_sleeps = [sleeper, ...new_sleeps]; | |
lazy(sleeper.stopped = true); | |
} else { | |
let sleeper = { | |
time: Unix.gettimeofday() +. delay, | |
stopped: false, | |
action: f, | |
}; | |
new_sleeps = [sleeper, ...new_sleeps]; | |
lazy(sleeper.stopped = true); | |
}; | |
pri register_readable = (fd, f) => { | |
let actions = | |
try(Fd_map.find(fd, wait_readable)) { | |
| Not_found => | |
let actions = Lwt_sequence.create(); | |
wait_readable = Fd_map.add(fd, actions, wait_readable); | |
actions; | |
}; | |
let node = Lwt_sequence.add_l(f, actions); | |
lazy( | |
{ | |
Lwt_sequence.remove(node); | |
if (Lwt_sequence.is_empty(actions)) { | |
wait_readable = Fd_map.remove(fd, wait_readable); | |
}; | |
} | |
); | |
}; | |
pri register_writable = (fd, f) => { | |
let actions = | |
try(Fd_map.find(fd, wait_writable)) { | |
| Not_found => | |
let actions = Lwt_sequence.create(); | |
wait_writable = Fd_map.add(fd, actions, wait_writable); | |
actions; | |
}; | |
let node = Lwt_sequence.add_l(f, actions); | |
lazy( | |
{ | |
Lwt_sequence.remove(node); | |
if (Lwt_sequence.is_empty(actions)) { | |
wait_writable = Fd_map.remove(fd, wait_writable); | |
}; | |
} | |
); | |
}; | |
}; | |
module T = Domainslib.Task; | |
class virtual select_based (pool) = { | |
as self; | |
inherit class select_or_poll_based; | |
pri virtual select: | |
(list(Unix.file_descr), list(Unix.file_descr), float) => | |
(list(Unix.file_descr), list(Unix.file_descr)); | |
pub iter = block => { | |
/* Transfer all sleepers added since the last iteration to the | |
main sleep queue: */ | |
sleep_queue = | |
List.fold_left( | |
(q, e) => Sleep_queue.add(e, q), | |
sleep_queue, | |
new_sleeps, | |
); | |
new_sleeps = []; | |
/* Collect file descriptors. */ | |
let fds_r = Fd_map.fold((fd, _, l) => [fd, ...l], wait_readable, []); | |
let fds_w = Fd_map.fold((fd, _, l) => [fd, ...l], wait_writable, []); | |
/* Compute the timeout. */ | |
let timeout = | |
if (block) { | |
get_next_timeout(sleep_queue); | |
} else { | |
0.; | |
}; | |
/* Do the blocking call */ | |
let (fds_r, fds_w) = | |
try(self#select(fds_r, fds_w, timeout)) { | |
| [@implicit_arity] Unix.Unix_error(Unix.EINTR, _, _) => ([], []) | |
| [@implicit_arity] Unix.Unix_error(Unix.EBADF, _, _) => | |
/* Keeps only bad file descriptors. Actions registered on | |
them have to handle the error: */ | |
(List.filter(bad_fd, fds_r), List.filter(bad_fd, fds_w)) | |
}; | |
/* Restart threads waiting for a timeout: */ | |
sleep_queue = restart_actions(sleep_queue, Unix.gettimeofday()); | |
let sleep_queue_task = | |
T.async(pool, () => restart_actions(sleep_queue, Unix.gettimeofday())); | |
/* Restart threads waiting on a file descriptors: */ | |
let fds_r_tasks = | |
List.map( | |
fd => T.async(pool, () => invoke_actions(fd, wait_readable)), | |
fds_r, | |
); | |
let fds_w_tasks = | |
List.map( | |
fd => T.async(pool, () => invoke_actions(fd, wait_writable)), | |
fds_w, | |
); | |
sleep_queue = T.await(pool, sleep_queue_task); | |
List.iter(T.await(pool), fds_r_tasks); | |
List.iter(T.await(pool), fds_w_tasks); | |
}; | |
}; | |
[@warning "-7"] | |
class select (pool) = { | |
as _; | |
inherit (class select_based)(pool); | |
pri select = (fds_r, fds_w, timeout) => { | |
let (fds_r, fds_w, _) = Unix.select(fds_r, fds_w, [], timeout); | |
(fds_r, fds_w); | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment