Skip to content

Instantly share code, notes, and snippets.

@israelshirk
Last active August 29, 2015 14:10
Show Gist options
  • Select an option

  • Save israelshirk/c7388b88ef3de6cdf927 to your computer and use it in GitHub Desktop.

Select an option

Save israelshirk/c7388b88ef3de6cdf927 to your computer and use it in GitHub Desktop.
Daemon wrapper for lagging signals
#!/usr/bin/python
import daemon
import grp
import os
import signal
import subprocess
import sys
import time
class ProcessPassthrough(object):
"""
This class acts as a gateway to a child process, delaying certain signals
for a hardcoded period of time such that the process it wraps will stay
alive until its cluster membership has been removed gracefully by other
services receiving the same set of signals from supervisord/systemd/etc.
The default timeout here is 60s - a bit overkill, but a good guarantee
that messages would propagate appropriate in the context of unreliable
message distribution and during failover scenarios.
"""
# This is a FIFO queue of signals which should be delayed by a certain
# number of seconds
signal_queue = []
# Signals we want to lag to our child so that other process can remove its
# membership from the relevant coordination service (and thus divert incoming
# requests) before it actually shuts down
signals_to_delay = [
signal.SIGABRT,
signal.SIGALRM,
signal.SIGINT,
signal.SIGQUIT,
signal.SIGTERM,
]
# Amount by which to lag above-enumerated signals
delay = 30
# Interval to sleep between polls of the child process
sleep_interval = 1/10.0
# Placeholder for the process reference from subprocess.Popen()
process = None
def __init__(self, command, context = None):
self.command = command
if context:
self.context = context
else:
self.context = daemon.DaemonContext()
# We actually want these to pipe out for standard Docker/Supervisord
# logging - things underneath may be running in a context that's a mix
# of service/daemon rather than daemon. So we trust the levels up and
# down from us to make decisions in that regard.
self.context.files_preserve = [sys.stdin, sys.stdout, sys.stderr, 0, 1, 2]
self.context.detach_process = False
self.context.stdin = sys.stdin
self.context.stdout = sys.stdout
self.context.stderr = sys.stderr
# This is a majority of the signals from the python signal module
# in Python 2.7.8 on Fedora 21, subtracting out those that don't
# make sense
self.context.signal_map = {
signal.SIGABRT: self.signal_abrt,
signal.SIGALRM: self.signal_alrm,
signal.SIGCHLD: self.signal_chld,
signal.SIGCONT: self.signal_cont,
signal.SIGHUP: self.signal_hup,
signal.SIGINT: self.signal_int,
signal.SIGPIPE: self.signal_pipe,
signal.SIGPROF: self.signal_prof,
signal.SIGQUIT: self.signal_quit,
# signal.SIGSEGV: self.signal_segv,
# signal.SIGSTOP: self.signal_stop,
signal.SIGSYS: self.signal_sys,
signal.SIGTERM: self.signal_term,
signal.SIGTRAP: self.signal_trap,
signal.SIGTSTP: self.signal_tstp,
signal.SIGTTIN: self.signal_ttin,
signal.SIGTTOU: self.signal_ttou,
signal.SIGURG: self.signal_urg,
signal.SIGUSR1: self.signal_usr1,
signal.SIGUSR2: self.signal_usr2,
signal.SIGVTALRM: self.signal_vtalrm,
}
def signal_abrt(self, *argsA, **argB):
self.passthrough_signal(signal.SIGABRT)
def signal_alrm(self, *argsA, **argB):
self.passthrough_signal(signal.SIGALRM)
def signal_chld(self, *argsA, **argB):
"""
This gets hit when the child process exits.
"""
if self.process:
status = self.process.poll()
if status != None:
sys.exit(status)
else:
print "self.process went away for inexplicable reasons."
sys.exit(1)
def signal_cont(self, *argsA, **argB):
self.passthrough_signal(signal.SIGCONT)
def signal_hup(self, *argsA, **argB):
self.passthrough_signal(signal.SIGHUP)
def signal_int(self, *argsA, **argB):
self.passthrough_signal(signal.SIGINT)
def signal_pipe(self, *argsA, **argB):
self.passthrough_signal(signal.SIGPIPE)
def signal_prof(self, *argsA, **argB):
self.passthrough_signal(signal.SIGPROF)
def signal_quit(self, *argsA, **argB):
self.passthrough_signal(signal.SIGQUIT)
def signal_segv(self, *argsA, **argB):
self.passthrough_signal(signal.SIGSEGV)
def signal_stop(self, *argsA, **argB):
self.passthrough_signal(signal.SIGSTOP)
def signal_sys(self, *argsA, **argB):
self.passthrough_signal(signal.SIGSYS)
def signal_term(self, *argsA, **argB):
self.passthrough_signal(signal.SIGTERM)
def signal_trap(self, *argsA, **argB):
self.passthrough_signal(signal.SIGTRAP)
def signal_tstp(self, *argsA, **argB):
self.passthrough_signal(signal.SIGTSTP)
def signal_ttin(self, *argsA, **argB):
self.passthrough_signal(signal.SIGTTIN)
def signal_ttou(self, *argsA, **argB):
self.passthrough_signal(signal.SIGTTOU)
def signal_urg(self, *argsA, **argB):
self.passthrough_signal(signal.SIGURG)
def signal_usr1(self, *argsA, **argB):
self.passthrough_signal(signal.SIGUSR1)
def signal_usr2(self, *argsA, **argB):
self.passthrough_signal(signal.SIGUSR2)
def signal_vtalrm(self, *argsA, **argB):
self.passthrough_signal(signal.SIGVTALRM)
def passthrough_signal(self, signal):
print "Got signal %d" % signal
if signal in self.signals_to_delay:
self.signal_queue.append([time.time(), signal])
else:
self.process.send_signal(signal)
def check_signal_queue(self):
if not self.signal_queue:
return
threshold = time.time() - self.delay
signal_timestamp = self.signal_queue[0][0]
while threshold >= signal_timestamp:
signal_info = self.signal_queue.pop(0)
print "Sending signal", signal_info[1]
self.process.send_signal(signal_info[1])
if not self.signal_queue:
return
signal_timestamp = self.signal_queue[0][0]
def run(self):
with self.context:
# Some docs say this can hard-lock things on large,
# outputs to/from the pipes but I haven't been
# able to get it to do so and haven't found an
# alternative. If you have weird stuff going on,
# that's probably a great place to start.
#
# See:
# https://docs.python.org/2/library/subprocess.html#subprocess.Popen
#
self.process = subprocess.Popen(
self.command
)
# We catch the sigchld signal above for notification
# of death of the child process. So checking status
# here would be a secondary thing.
#
# The expectation here is that Python is trying
# to do some reading and buffers may fill up;
# blocking too long here may cause issues. So
# we just return to the Python process every
# 1/10^-1s to let the subprocess module do its
# thing. YMMV.
while 1:
self.check_signal_queue()
self.process.poll()
time.sleep(self.sleep_interval)
if __name__ == "__main__":
process = ProcessPassthrough(sys.argv[1:])
process.run()
import signal
import unittest
from mock import Mock, MagicMock
from daemon_wrapper import ProcessPassthrough
class DoneException(BaseException):
pass
class testDaemonWrapper(unittest.TestCase):
def setUp(self):
self.setUpDaemonContext()
self.setUpPopen()
self.setUpClock()
self.subject = ProcessPassthrough(['sleep', '5'])
self.subject.sleep_interval = 1
self.triggeredSignals = []
def tearDown(self):
self.tearDownDaemonContext()
self.tearDownPopen()
self.tearDownClock()
def setUpDaemonContext(self):
fakeContext = MagicMock()
def skippy(*argc, **argv):
return
fakeContext.__enter__.side_effect = skippy
fakeContext.__exit__.side_effect = skippy
def getFakeContextMock():
return fakeContext
import daemon
self.oldDaemonContext = daemon.DaemonContext
daemon.DaemonContext = getFakeContextMock
def tearDownDaemonContext(self):
import daemon
daemon.DaemonContext = self.oldDaemonContext
def setUpPopen(self):
self.processData = {
'sent_sigchld': False
}
class FakeProcess(object):
def __init__(self):
pass
self.processMock = MagicMock()
def process_poll():
if self.clockState['time'] >= 120:
self.processData['sent_sigchld'] = True
raise DoneException("end")
return None
def process_send_signal(signum):
self.triggeredSignals.append(signum)
self.processMock.poll = Mock()
self.processMock.poll = process_poll
self.processMock.send_signal = Mock()
self.processMock.send_signal = process_send_signal
def getProcessMock(*argc, **argv):
return self.processMock
import subprocess
self.oldPopen = subprocess.Popen
subprocess.Popen = getProcessMock
def tearDownPopen(self):
import subprocess
subprocess.Popen = self.oldPopen
def setUpClock(self):
self.clockState = {
'time': 0
}
def mockSleep(increment):
self.clockState['time'] += increment
if self.clockState['time'] > 300:
self.fail("Ran out of time")
self.tick()
def mockTime():
return self.clockState['time']
import time
self.oldSleep = time.sleep
self.oldTime = time.time
time.sleep = mockSleep
time.time = mockTime
def tearDownClock(self):
import time
time.sleep = self.oldSleep
time.time = self.oldTime
def triggerAllSignals(self):
self.subject.signal_abrt()
self.subject.signal_alrm()
self.subject.signal_chld()
self.subject.signal_cont()
self.subject.signal_hup()
self.subject.signal_int()
self.subject.signal_pipe()
self.subject.signal_prof()
self.subject.signal_quit()
self.subject.signal_segv()
self.subject.signal_stop()
self.subject.signal_sys()
self.subject.signal_term()
self.subject.signal_trap()
self.subject.signal_tstp()
self.subject.signal_ttin()
self.subject.signal_ttou()
self.subject.signal_urg()
self.subject.signal_usr1()
self.subject.signal_usr2()
self.subject.signal_vtalrm()
def checkSignals(self, expected):
actual = self.triggeredSignals
actual.sort()
expected.sort()
self.assertEqual(expected, actual)
def expectNoSignals(self):
self.assertEqual(self.triggeredSignals, [])
def expectImmediateSignals(self):
expectedSignals = [
signal.SIGCONT,
signal.SIGHUP,
signal.SIGPIPE,
signal.SIGPROF,
signal.SIGSEGV,
signal.SIGSTOP,
signal.SIGSYS,
signal.SIGTRAP,
signal.SIGTSTP,
signal.SIGTTIN,
signal.SIGTTOU,
signal.SIGURG,
signal.SIGUSR1,
signal.SIGUSR2,
signal.SIGVTALRM,
]
self.checkSignals(expectedSignals)
def expectDelayedSignals(self):
expectedSignals = [
signal.SIGABRT,
signal.SIGALRM,
signal.SIGINT,
signal.SIGQUIT,
signal.SIGTERM,
]
self.checkSignals(expectedSignals)
def expectAllSignals(self):
expectedSignals = [
signal.SIGABRT,
signal.SIGALRM,
signal.SIGCONT,
signal.SIGHUP,
signal.SIGINT,
signal.SIGPIPE,
signal.SIGPROF,
signal.SIGQUIT,
signal.SIGSEGV,
signal.SIGSTOP,
signal.SIGSYS,
signal.SIGTERM,
signal.SIGTRAP,
signal.SIGTSTP,
signal.SIGTTIN,
signal.SIGTTOU,
signal.SIGURG,
signal.SIGUSR1,
signal.SIGUSR2,
signal.SIGVTALRM,
]
self.checkSignals(expectedSignals)
def resetSignals(self):
self.triggeredSignals = []
def test_execution_in_steps(self):
def tick(*args):
if self.clockState['time'] == 10:
self.triggerAllSignals()
self.expectImmediateSignals()
self.resetSignals()
if self.clockState['time'] == 80:
self.expectDelayedSignals()
self.tick = tick
from daemon_wrapper import ProcessPassthrough
try:
self.subject.run()
except DoneException:
self.expectDelayedSignals()
return
raise Exception("Should not have gotten here")
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment