Created
June 13, 2018 09:31
-
-
Save sargun/0d2d259ba214f1927493eb1036b4044d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Unix; | |
open Printf; | |
open Sys; | |
open ExtUnix.All; | |
open Thread; | |
open CCBlockingQueue; | |
open CCTimer; | |
/* | |
* How long do we wait for the discovery deregistration | |
* process to complete | |
*/ | |
let discovery_deregistration_timeout = 30.; | |
/* | |
* How long do we wait after successfully removing | |
* outselves out of discovery before we begin to | |
* forward signals | |
*/ | |
let discovery_grace_timeout = 60.; | |
type exn += | |
| Unknown_process_state; | |
let rec waitpid_loop = (pid: int) => | |
/* TODO, Wrap in Misc.restart_on_EINTR */ | |
switch (Unix.waitpid([], pid)) { | |
| (_, WEXITED(return_code)) => | |
Log.debug("Process exited with return code: %d\n", return_code); | |
Pervasives.exit(return_code); | |
| (_, WSIGNALED(signal_code)) => | |
Log.info("Process exited with signal code: %d\n", signal_code); | |
Pervasives.exit(128 + signal_code); | |
| exception (Unix.Unix_error(Unix.EINTR, "waitpid", _)) => | |
Log.debug("Received unix interrupt error, restarting waitpid loop"); | |
waitpid_loop(pid); | |
| _ => raise(Unknown_process_state) | |
}; | |
type signal_msg = | |
| DiscoveryDeregistrationComplete | |
| DiscoveryGracePeriodComplete | |
| DiscoveryTimeout | |
| Signal(int); | |
type exn += | |
| UnexpectedMessage(signal_msg); | |
let rec signal_listener_forwarder = (pid, sigq) => { | |
switch (CCBlockingQueue.take(sigq)) { | |
| Signal(sig_val) => | |
Log.info("Forwarder processing signal: %d", sig_val); | |
Unix.kill(pid, sig_val); | |
| DiscoveryTimeout => () | |
| DiscoveryDeregistrationComplete => () | |
| DiscoveryGracePeriodComplete => () | |
/* add arbitrary wait bit */ | |
}; | |
signal_listener_forwarder(pid, sigq); | |
}; | |
let rec signal_listener_wait = (pid, sigq, first_signal) => { | |
/* Wait for some arbitrary timeout, and then forward signals, or if we get a signal, | |
* start forwarding all signals | |
*/ | |
switch (CCBlockingQueue.take(sigq)) { | |
| Signal(sig_val) => | |
Log.info( | |
"Going into forwarding signal mode, during wait, due to signal: %d", | |
sig_val | |
); | |
Unix.kill(pid, first_signal); | |
Unix.kill(pid, sig_val); | |
ignore(signal_listener_forwarder(pid, sigq)); | |
| DiscoveryGracePeriodComplete => | |
Log.info("Going into forwarding signal mode, wait is completed"); | |
Unix.kill(pid, first_signal); | |
ignore(signal_listener_forwarder(pid, sigq)); | |
/* Both of these messages can come in late */ | |
| DiscoveryTimeout => () | |
| DiscoveryDeregistrationComplete => () | |
/* add arbitrary wait bit */ | |
}; | |
signal_listener_wait(pid, sigq, first_signal); | |
}; | |
let signal_listener_phase1 = (pid, sigq, first_signal, discovery_timer) => | |
/* TODO: kickoff discovery registration */ | |
switch (CCBlockingQueue.take(sigq)) { | |
| DiscoveryTimeout => | |
Log.error("Received discovery timeout"); | |
signal_listener_wait(pid, sigq, first_signal); | |
| DiscoveryDeregistrationComplete => | |
Log.info("Discover deregistration completed"); | |
CCTimer.stop(discovery_timer); | |
/* | |
* Even though we stopped the timer here, | |
* we still might get a message from it | |
* since it's async | |
*/ | |
let grace_period_timer = CCTimer.create(); | |
CCTimer.after(grace_period_timer, discovery_grace_timeout, () => | |
assert (CCBlockingQueue.try_push(sigq, DiscoveryGracePeriodComplete)) | |
); | |
signal_listener_wait(pid, sigq, first_signal); | |
| Signal(sig_val) => | |
Log.info( | |
"Going into forwarding signal mode, during discovery de-registration, due to 2nd signal: %d", | |
sig_val | |
); | |
CCTimer.stop(discovery_timer); | |
Unix.kill(pid, first_signal); | |
Unix.kill(pid, sig_val); | |
signal_listener_forwarder(pid, sigq); | |
| e => raise(UnexpectedMessage(e)) | |
/* Add successful discovery completion here */ | |
/* add wait for discovery bit */ | |
}; | |
let discovery_deregistration = sigq => { | |
Log.info("Beginning discovery deregistration"); | |
Unix.sleep(5); | |
assert (CCBlockingQueue.try_push(sigq, DiscoveryDeregistrationComplete)); | |
}; | |
let signal_listener_thread = (pid, sigq) => { | |
/* | |
* In this state, the loop is just listening, and waiting for a signal. | |
* Once we receive a signal, we kick off deregistration in discovery, | |
* and we run that with timeout N. Either timeout N must elapse, or | |
* the discovery deregistration must finish. Once that happens, | |
* we forward the signal that we last received. | |
* | |
* If at any point, during this we receive another signal, | |
* all bets are off, and we immediately start forwarding | |
* signals. | |
*/ | |
let sig_val = | |
switch (CCBlockingQueue.take(sigq)) { | |
| Signal(sig_val) => sig_val | |
| e => raise(UnexpectedMessage(e)) | |
}; | |
let _ = Thread.create((_) => discovery_deregistration(sigq), ()); | |
let timer = CCTimer.create(); | |
CCTimer.after(timer, discovery_deregistration_timeout, () => | |
assert (CCBlockingQueue.try_push(sigq, DiscoveryTimeout)) | |
); | |
signal_listener_phase1(pid, sigq, sig_val, timer); | |
}; | |
let rec signal_cb_thread = (sigq, signals) => { | |
let my_sig = Thread.wait_signal(signals); | |
assert (CCBlockingQueue.try_push(sigq, Signal(my_sig))); | |
signal_cb_thread(sigq, signals); | |
}; | |
let parent = (pid: int, signals) => { | |
let sigq = CCBlockingQueue.create(max_int); | |
let _ = Thread.create((_) => ignore(signal_listener_thread(pid, sigq)), ()); | |
let _ = Thread.create((_) => ignore(signal_cb_thread(sigq, signals)), ()); | |
waitpid_loop(pid); | |
}; | |
let child = () => { | |
/* Replace with real child execution code */ | |
Sys.set_signal( | |
Sys.sigint, | |
Signal_handle((_: int) => Log.info("Saw SIGINT")) | |
); | |
Sys.set_signal( | |
Sys.sigterm, | |
Signal_handle((_: int) => Log.info("Saw SIGTERM")) | |
); | |
let _ = Unix.sigprocmask(SIG_UNBLOCK, [Sys.sigint, Sys.sigterm]); | |
ExtUnix.All.setpgid(0, 0); | |
Log.info("In child"); | |
let _ = Unix.sleep(1000); | |
Log.info("Done sleeping"); | |
Pervasives.exit(1); | |
}; | |
let () = { | |
Log.set_log_level(Log.DEBUG); | |
Log.set_output(Pervasives.stderr); | |
Log.color_on(); | |
let signals = [Sys.sigint, Sys.sigterm]; | |
let _ = Unix.sigprocmask(SIG_BLOCK, signals); | |
switch (Unix.fork()) { | |
| 0 => child() | |
| pid => parent(pid, signals) | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment