Last active
August 29, 2015 14:10
-
-
Save israelshirk/c7388b88ef3de6cdf927 to your computer and use it in GitHub Desktop.
Daemon wrapper for lagging signals
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| import daemon | |
| import grp | |
| import os | |
| import signal | |
| import subprocess | |
| import sys | |
| import time | |
| class ProcessPassthrough(object): | |
| """ | |
| This class acts as a gateway to a child process, delaying certain signals | |
| for a hardcoded period of time such that the process it wraps will stay | |
| alive until its cluster membership has been removed gracefully by other | |
| services receiving the same set of signals from supervisord/systemd/etc. | |
| The default timeout here is 60s - a bit overkill, but a good guarantee | |
| that messages would propagate appropriate in the context of unreliable | |
| message distribution and during failover scenarios. | |
| """ | |
| # This is a FIFO queue of signals which should be delayed by a certain | |
| # number of seconds | |
| signal_queue = [] | |
| # Signals we want to lag to our child so that other process can remove its | |
| # membership from the relevant coordination service (and thus divert incoming | |
| # requests) before it actually shuts down | |
| signals_to_delay = [ | |
| signal.SIGABRT, | |
| signal.SIGALRM, | |
| signal.SIGINT, | |
| signal.SIGQUIT, | |
| signal.SIGTERM, | |
| ] | |
| # Amount by which to lag above-enumerated signals | |
| delay = 30 | |
| # Interval to sleep between polls of the child process | |
| sleep_interval = 1/10.0 | |
| # Placeholder for the process reference from subprocess.Popen() | |
| process = None | |
| def __init__(self, command, context = None): | |
| self.command = command | |
| if context: | |
| self.context = context | |
| else: | |
| self.context = daemon.DaemonContext() | |
| # We actually want these to pipe out for standard Docker/Supervisord | |
| # logging - things underneath may be running in a context that's a mix | |
| # of service/daemon rather than daemon. So we trust the levels up and | |
| # down from us to make decisions in that regard. | |
| self.context.files_preserve = [sys.stdin, sys.stdout, sys.stderr, 0, 1, 2] | |
| self.context.detach_process = False | |
| self.context.stdin = sys.stdin | |
| self.context.stdout = sys.stdout | |
| self.context.stderr = sys.stderr | |
| # This is a majority of the signals from the python signal module | |
| # in Python 2.7.8 on Fedora 21, subtracting out those that don't | |
| # make sense | |
| self.context.signal_map = { | |
| signal.SIGABRT: self.signal_abrt, | |
| signal.SIGALRM: self.signal_alrm, | |
| signal.SIGCHLD: self.signal_chld, | |
| signal.SIGCONT: self.signal_cont, | |
| signal.SIGHUP: self.signal_hup, | |
| signal.SIGINT: self.signal_int, | |
| signal.SIGPIPE: self.signal_pipe, | |
| signal.SIGPROF: self.signal_prof, | |
| signal.SIGQUIT: self.signal_quit, | |
| # signal.SIGSEGV: self.signal_segv, | |
| # signal.SIGSTOP: self.signal_stop, | |
| signal.SIGSYS: self.signal_sys, | |
| signal.SIGTERM: self.signal_term, | |
| signal.SIGTRAP: self.signal_trap, | |
| signal.SIGTSTP: self.signal_tstp, | |
| signal.SIGTTIN: self.signal_ttin, | |
| signal.SIGTTOU: self.signal_ttou, | |
| signal.SIGURG: self.signal_urg, | |
| signal.SIGUSR1: self.signal_usr1, | |
| signal.SIGUSR2: self.signal_usr2, | |
| signal.SIGVTALRM: self.signal_vtalrm, | |
| } | |
| def signal_abrt(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGABRT) | |
| def signal_alrm(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGALRM) | |
| def signal_chld(self, *argsA, **argB): | |
| """ | |
| This gets hit when the child process exits. | |
| """ | |
| if self.process: | |
| status = self.process.poll() | |
| if status != None: | |
| sys.exit(status) | |
| else: | |
| print "self.process went away for inexplicable reasons." | |
| sys.exit(1) | |
| def signal_cont(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGCONT) | |
| def signal_hup(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGHUP) | |
| def signal_int(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGINT) | |
| def signal_pipe(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGPIPE) | |
| def signal_prof(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGPROF) | |
| def signal_quit(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGQUIT) | |
| def signal_segv(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGSEGV) | |
| def signal_stop(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGSTOP) | |
| def signal_sys(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGSYS) | |
| def signal_term(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGTERM) | |
| def signal_trap(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGTRAP) | |
| def signal_tstp(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGTSTP) | |
| def signal_ttin(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGTTIN) | |
| def signal_ttou(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGTTOU) | |
| def signal_urg(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGURG) | |
| def signal_usr1(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGUSR1) | |
| def signal_usr2(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGUSR2) | |
| def signal_vtalrm(self, *argsA, **argB): | |
| self.passthrough_signal(signal.SIGVTALRM) | |
| def passthrough_signal(self, signal): | |
| print "Got signal %d" % signal | |
| if signal in self.signals_to_delay: | |
| self.signal_queue.append([time.time(), signal]) | |
| else: | |
| self.process.send_signal(signal) | |
| def check_signal_queue(self): | |
| if not self.signal_queue: | |
| return | |
| threshold = time.time() - self.delay | |
| signal_timestamp = self.signal_queue[0][0] | |
| while threshold >= signal_timestamp: | |
| signal_info = self.signal_queue.pop(0) | |
| print "Sending signal", signal_info[1] | |
| self.process.send_signal(signal_info[1]) | |
| if not self.signal_queue: | |
| return | |
| signal_timestamp = self.signal_queue[0][0] | |
| def run(self): | |
| with self.context: | |
| # Some docs say this can hard-lock things on large, | |
| # outputs to/from the pipes but I haven't been | |
| # able to get it to do so and haven't found an | |
| # alternative. If you have weird stuff going on, | |
| # that's probably a great place to start. | |
| # | |
| # See: | |
| # https://docs.python.org/2/library/subprocess.html#subprocess.Popen | |
| # | |
| self.process = subprocess.Popen( | |
| self.command | |
| ) | |
| # We catch the sigchld signal above for notification | |
| # of death of the child process. So checking status | |
| # here would be a secondary thing. | |
| # | |
| # The expectation here is that Python is trying | |
| # to do some reading and buffers may fill up; | |
| # blocking too long here may cause issues. So | |
| # we just return to the Python process every | |
| # 1/10^-1s to let the subprocess module do its | |
| # thing. YMMV. | |
| while 1: | |
| self.check_signal_queue() | |
| self.process.poll() | |
| time.sleep(self.sleep_interval) | |
| if __name__ == "__main__": | |
| process = ProcessPassthrough(sys.argv[1:]) | |
| process.run() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import signal | |
| import unittest | |
| from mock import Mock, MagicMock | |
| from daemon_wrapper import ProcessPassthrough | |
| class DoneException(BaseException): | |
| pass | |
| class testDaemonWrapper(unittest.TestCase): | |
| def setUp(self): | |
| self.setUpDaemonContext() | |
| self.setUpPopen() | |
| self.setUpClock() | |
| self.subject = ProcessPassthrough(['sleep', '5']) | |
| self.subject.sleep_interval = 1 | |
| self.triggeredSignals = [] | |
| def tearDown(self): | |
| self.tearDownDaemonContext() | |
| self.tearDownPopen() | |
| self.tearDownClock() | |
| def setUpDaemonContext(self): | |
| fakeContext = MagicMock() | |
| def skippy(*argc, **argv): | |
| return | |
| fakeContext.__enter__.side_effect = skippy | |
| fakeContext.__exit__.side_effect = skippy | |
| def getFakeContextMock(): | |
| return fakeContext | |
| import daemon | |
| self.oldDaemonContext = daemon.DaemonContext | |
| daemon.DaemonContext = getFakeContextMock | |
| def tearDownDaemonContext(self): | |
| import daemon | |
| daemon.DaemonContext = self.oldDaemonContext | |
| def setUpPopen(self): | |
| self.processData = { | |
| 'sent_sigchld': False | |
| } | |
| class FakeProcess(object): | |
| def __init__(self): | |
| pass | |
| self.processMock = MagicMock() | |
| def process_poll(): | |
| if self.clockState['time'] >= 120: | |
| self.processData['sent_sigchld'] = True | |
| raise DoneException("end") | |
| return None | |
| def process_send_signal(signum): | |
| self.triggeredSignals.append(signum) | |
| self.processMock.poll = Mock() | |
| self.processMock.poll = process_poll | |
| self.processMock.send_signal = Mock() | |
| self.processMock.send_signal = process_send_signal | |
| def getProcessMock(*argc, **argv): | |
| return self.processMock | |
| import subprocess | |
| self.oldPopen = subprocess.Popen | |
| subprocess.Popen = getProcessMock | |
| def tearDownPopen(self): | |
| import subprocess | |
| subprocess.Popen = self.oldPopen | |
| def setUpClock(self): | |
| self.clockState = { | |
| 'time': 0 | |
| } | |
| def mockSleep(increment): | |
| self.clockState['time'] += increment | |
| if self.clockState['time'] > 300: | |
| self.fail("Ran out of time") | |
| self.tick() | |
| def mockTime(): | |
| return self.clockState['time'] | |
| import time | |
| self.oldSleep = time.sleep | |
| self.oldTime = time.time | |
| time.sleep = mockSleep | |
| time.time = mockTime | |
| def tearDownClock(self): | |
| import time | |
| time.sleep = self.oldSleep | |
| time.time = self.oldTime | |
| def triggerAllSignals(self): | |
| self.subject.signal_abrt() | |
| self.subject.signal_alrm() | |
| self.subject.signal_chld() | |
| self.subject.signal_cont() | |
| self.subject.signal_hup() | |
| self.subject.signal_int() | |
| self.subject.signal_pipe() | |
| self.subject.signal_prof() | |
| self.subject.signal_quit() | |
| self.subject.signal_segv() | |
| self.subject.signal_stop() | |
| self.subject.signal_sys() | |
| self.subject.signal_term() | |
| self.subject.signal_trap() | |
| self.subject.signal_tstp() | |
| self.subject.signal_ttin() | |
| self.subject.signal_ttou() | |
| self.subject.signal_urg() | |
| self.subject.signal_usr1() | |
| self.subject.signal_usr2() | |
| self.subject.signal_vtalrm() | |
| def checkSignals(self, expected): | |
| actual = self.triggeredSignals | |
| actual.sort() | |
| expected.sort() | |
| self.assertEqual(expected, actual) | |
| def expectNoSignals(self): | |
| self.assertEqual(self.triggeredSignals, []) | |
| def expectImmediateSignals(self): | |
| expectedSignals = [ | |
| signal.SIGCONT, | |
| signal.SIGHUP, | |
| signal.SIGPIPE, | |
| signal.SIGPROF, | |
| signal.SIGSEGV, | |
| signal.SIGSTOP, | |
| signal.SIGSYS, | |
| signal.SIGTRAP, | |
| signal.SIGTSTP, | |
| signal.SIGTTIN, | |
| signal.SIGTTOU, | |
| signal.SIGURG, | |
| signal.SIGUSR1, | |
| signal.SIGUSR2, | |
| signal.SIGVTALRM, | |
| ] | |
| self.checkSignals(expectedSignals) | |
| def expectDelayedSignals(self): | |
| expectedSignals = [ | |
| signal.SIGABRT, | |
| signal.SIGALRM, | |
| signal.SIGINT, | |
| signal.SIGQUIT, | |
| signal.SIGTERM, | |
| ] | |
| self.checkSignals(expectedSignals) | |
| def expectAllSignals(self): | |
| expectedSignals = [ | |
| signal.SIGABRT, | |
| signal.SIGALRM, | |
| signal.SIGCONT, | |
| signal.SIGHUP, | |
| signal.SIGINT, | |
| signal.SIGPIPE, | |
| signal.SIGPROF, | |
| signal.SIGQUIT, | |
| signal.SIGSEGV, | |
| signal.SIGSTOP, | |
| signal.SIGSYS, | |
| signal.SIGTERM, | |
| signal.SIGTRAP, | |
| signal.SIGTSTP, | |
| signal.SIGTTIN, | |
| signal.SIGTTOU, | |
| signal.SIGURG, | |
| signal.SIGUSR1, | |
| signal.SIGUSR2, | |
| signal.SIGVTALRM, | |
| ] | |
| self.checkSignals(expectedSignals) | |
| def resetSignals(self): | |
| self.triggeredSignals = [] | |
| def test_execution_in_steps(self): | |
| def tick(*args): | |
| if self.clockState['time'] == 10: | |
| self.triggerAllSignals() | |
| self.expectImmediateSignals() | |
| self.resetSignals() | |
| if self.clockState['time'] == 80: | |
| self.expectDelayedSignals() | |
| self.tick = tick | |
| from daemon_wrapper import ProcessPassthrough | |
| try: | |
| self.subject.run() | |
| except DoneException: | |
| self.expectDelayedSignals() | |
| return | |
| raise Exception("Should not have gotten here") | |
| if __name__ == '__main__': | |
| unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment