Last active
April 23, 2018 21:36
-
-
Save growse/f37ca9da772d9b43b97d819e2d08eac1 to your computer and use it in GitHub Desktop.
Coordinating multiple python2.7 processes, IPC and signals
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import os | |
import signal | |
import sys | |
import multiprocessing | |
import threading | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
magic_queue = multiprocessing.Queue() | |
# Quit on these things | |
signals = (signal.SIGINT, signal.SIGTERM, signal.SIGCHLD, signal.SIGQUIT) | |
# This subprocess methods doesn't need to signal to quit, it just needs to see if anyone has set my_event | |
def do_thing(): | |
ppid = os.getppid() # Write down what our parent pid is | |
my_event = threading.Event() | |
def my_signal_handler(signal, _): | |
logging.info("Do Thing Signal handler") | |
my_event.set() | |
# Signal handlers are inherited. So we need to ignore them when sent to the child. | |
for signame in signals: | |
signal.signal(signame, my_signal_handler) | |
while not my_event.is_set(): | |
logging.info("Doing a thing") | |
my_event.wait(5) | |
if ppid != os.getppid(): # If this has happened, parent is dead and we're a zombie. Probably best to stop. | |
my_event.set() | |
# This subprocess may need to tell everyone to quit, so needs the queue that the master is blocking on. But it also | |
# needs the event, to listen to other things that might want to tell it to stop. | |
def do_another_thing(my_queue): | |
ppid = os.getppid() | |
my_event = threading.Event() | |
def my_signal_handler(signal, _): | |
logging.info("Do Another Thing Signal handler") | |
my_event.set() | |
for signame in signals: | |
signal.signal(signame, my_signal_handler) | |
while not my_event.is_set(): | |
if os.path.isfile("quitnow"): | |
logging.info("Found quit file, quitting") | |
break | |
my_event.wait(1) | |
if ppid != os.getppid(): | |
my_event.set() | |
my_queue.put("PLS STOP") | |
def signal_handler(received_signal, _): | |
logging.info("Received signal {}".format(received_signal)) | |
if received_signal == signal.SIGCHLD: | |
pid, status = os.waitpid(-1, os.WNOHANG | os.WUNTRACED | os.WCONTINUED) | |
if os.WIFCONTINUED(status) or os.WIFSTOPPED(status): | |
return | |
if os.WIFSIGNALED(status) or os.WIFEXITED(status): | |
logging.info("SIGCHLD. Something happened. Pid: {}, Status: {}".format(pid, status)) | |
logging.info("Something signalled or exited. Term sig is {}".format(os.WTERMSIG(status))) | |
magic_queue.put(True) | |
else: | |
# Let's assume any signal means "quit" | |
logging.info("Signal handler queuing a message") | |
magic_queue.put(True) | |
for signame in signals: | |
signal.signal(signame, signal_handler) | |
joinables = [] | |
subprocess1 = multiprocessing.Process(target=do_thing) | |
joinables.append(subprocess1) | |
subprocess1.start() | |
subprocess2 = multiprocessing.Process(target=do_another_thing, args=(magic_queue,)) | |
joinables.append(subprocess2) | |
subprocess2.start() | |
logging.info("Waiting on... something?") | |
# At the first whiff of trouble, something sends a message to this queue and we bring the whole thing down. | |
# We use a queue here because inexplicably, Event.wait() deadlocks with the signal_handler. | |
result = magic_queue.get() | |
logging.info("Got the queue message: {}".format(result)) | |
signal.signal(signal.SIGCHLD, signal.SIG_DFL) # We don't care for SIGCHLDs any more. | |
logging.info("Killing children") | |
for joinable in joinables: | |
try: | |
logging.info("Trying to terminate {} pid {}".format(joinable, joinable.pid)) | |
joinable.terminate() | |
except OSError: | |
# Turns out the subprocess might not exist any more, even if the master thinks it does | |
pass | |
# Wait for stuff to stop | |
for joinable in joinables: | |
logging.info("joining on {}".format(joinable)) | |
joinable.join() | |
# Hurray! | |
logging.info("exit") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment