Skip to content

Instantly share code, notes, and snippets.

@sargun
Created June 13, 2018 09:31
Show Gist options
  • Save sargun/0d2d259ba214f1927493eb1036b4044d to your computer and use it in GitHub Desktop.
Save sargun/0d2d259ba214f1927493eb1036b4044d to your computer and use it in GitHub Desktop.
open Unix;
open Printf;
open Sys;
open ExtUnix.All;
open Thread;
open CCBlockingQueue;
open CCTimer;
/*
* How long do we wait for the discovery deregistration
* process to complete
*/
let discovery_deregistration_timeout = 30.;
/*
* How long do we wait after successfully removing
* outselves out of discovery before we begin to
* forward signals
*/
let discovery_grace_timeout = 60.;
type exn +=
| Unknown_process_state;
let rec waitpid_loop = (pid: int) =>
/* TODO, Wrap in Misc.restart_on_EINTR */
switch (Unix.waitpid([], pid)) {
| (_, WEXITED(return_code)) =>
Log.debug("Process exited with return code: %d\n", return_code);
Pervasives.exit(return_code);
| (_, WSIGNALED(signal_code)) =>
Log.info("Process exited with signal code: %d\n", signal_code);
Pervasives.exit(128 + signal_code);
| exception (Unix.Unix_error(Unix.EINTR, "waitpid", _)) =>
Log.debug("Received unix interrupt error, restarting waitpid loop");
waitpid_loop(pid);
| _ => raise(Unknown_process_state)
};
type signal_msg =
| DiscoveryDeregistrationComplete
| DiscoveryGracePeriodComplete
| DiscoveryTimeout
| Signal(int);
type exn +=
| UnexpectedMessage(signal_msg);
let rec signal_listener_forwarder = (pid, sigq) => {
switch (CCBlockingQueue.take(sigq)) {
| Signal(sig_val) =>
Log.info("Forwarder processing signal: %d", sig_val);
Unix.kill(pid, sig_val);
| DiscoveryTimeout => ()
| DiscoveryDeregistrationComplete => ()
| DiscoveryGracePeriodComplete => ()
/* add arbitrary wait bit */
};
signal_listener_forwarder(pid, sigq);
};
let rec signal_listener_wait = (pid, sigq, first_signal) => {
/* Wait for some arbitrary timeout, and then forward signals, or if we get a signal,
* start forwarding all signals
*/
switch (CCBlockingQueue.take(sigq)) {
| Signal(sig_val) =>
Log.info(
"Going into forwarding signal mode, during wait, due to signal: %d",
sig_val
);
Unix.kill(pid, first_signal);
Unix.kill(pid, sig_val);
ignore(signal_listener_forwarder(pid, sigq));
| DiscoveryGracePeriodComplete =>
Log.info("Going into forwarding signal mode, wait is completed");
Unix.kill(pid, first_signal);
ignore(signal_listener_forwarder(pid, sigq));
/* Both of these messages can come in late */
| DiscoveryTimeout => ()
| DiscoveryDeregistrationComplete => ()
/* add arbitrary wait bit */
};
signal_listener_wait(pid, sigq, first_signal);
};
let signal_listener_phase1 = (pid, sigq, first_signal, discovery_timer) =>
/* TODO: kickoff discovery registration */
switch (CCBlockingQueue.take(sigq)) {
| DiscoveryTimeout =>
Log.error("Received discovery timeout");
signal_listener_wait(pid, sigq, first_signal);
| DiscoveryDeregistrationComplete =>
Log.info("Discover deregistration completed");
CCTimer.stop(discovery_timer);
/*
* Even though we stopped the timer here,
* we still might get a message from it
* since it's async
*/
let grace_period_timer = CCTimer.create();
CCTimer.after(grace_period_timer, discovery_grace_timeout, () =>
assert (CCBlockingQueue.try_push(sigq, DiscoveryGracePeriodComplete))
);
signal_listener_wait(pid, sigq, first_signal);
| Signal(sig_val) =>
Log.info(
"Going into forwarding signal mode, during discovery de-registration, due to 2nd signal: %d",
sig_val
);
CCTimer.stop(discovery_timer);
Unix.kill(pid, first_signal);
Unix.kill(pid, sig_val);
signal_listener_forwarder(pid, sigq);
| e => raise(UnexpectedMessage(e))
/* Add successful discovery completion here */
/* add wait for discovery bit */
};
let discovery_deregistration = sigq => {
Log.info("Beginning discovery deregistration");
Unix.sleep(5);
assert (CCBlockingQueue.try_push(sigq, DiscoveryDeregistrationComplete));
};
let signal_listener_thread = (pid, sigq) => {
/*
* In this state, the loop is just listening, and waiting for a signal.
* Once we receive a signal, we kick off deregistration in discovery,
* and we run that with timeout N. Either timeout N must elapse, or
* the discovery deregistration must finish. Once that happens,
* we forward the signal that we last received.
*
* If at any point, during this we receive another signal,
* all bets are off, and we immediately start forwarding
* signals.
*/
let sig_val =
switch (CCBlockingQueue.take(sigq)) {
| Signal(sig_val) => sig_val
| e => raise(UnexpectedMessage(e))
};
let _ = Thread.create((_) => discovery_deregistration(sigq), ());
let timer = CCTimer.create();
CCTimer.after(timer, discovery_deregistration_timeout, () =>
assert (CCBlockingQueue.try_push(sigq, DiscoveryTimeout))
);
signal_listener_phase1(pid, sigq, sig_val, timer);
};
let rec signal_cb_thread = (sigq, signals) => {
let my_sig = Thread.wait_signal(signals);
assert (CCBlockingQueue.try_push(sigq, Signal(my_sig)));
signal_cb_thread(sigq, signals);
};
let parent = (pid: int, signals) => {
let sigq = CCBlockingQueue.create(max_int);
let _ = Thread.create((_) => ignore(signal_listener_thread(pid, sigq)), ());
let _ = Thread.create((_) => ignore(signal_cb_thread(sigq, signals)), ());
waitpid_loop(pid);
};
let child = () => {
/* Replace with real child execution code */
Sys.set_signal(
Sys.sigint,
Signal_handle((_: int) => Log.info("Saw SIGINT"))
);
Sys.set_signal(
Sys.sigterm,
Signal_handle((_: int) => Log.info("Saw SIGTERM"))
);
let _ = Unix.sigprocmask(SIG_UNBLOCK, [Sys.sigint, Sys.sigterm]);
ExtUnix.All.setpgid(0, 0);
Log.info("In child");
let _ = Unix.sleep(1000);
Log.info("Done sleeping");
Pervasives.exit(1);
};
let () = {
Log.set_log_level(Log.DEBUG);
Log.set_output(Pervasives.stderr);
Log.color_on();
let signals = [Sys.sigint, Sys.sigterm];
let _ = Unix.sigprocmask(SIG_BLOCK, signals);
switch (Unix.fork()) {
| 0 => child()
| pid => parent(pid, signals)
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment