Created
February 10, 2014 09:03
-
-
Save sp3c73r2038/8912632 to your computer and use it in GitHub Desktop.
forking worker process server, handling signals with a queue. idea stolen from unicorn.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from datetime import datetime | |
import errno | |
from fcntl import fcntl, F_SETFL | |
import logging | |
import os | |
from select import select | |
from signal import signal, SIGQUIT, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, SIGHUP | |
from signal import SIGCHLD, SIG_DFL, SIGKILL | |
import sys | |
from time import sleep | |
logging.basicConfig(format=('[%(asctime)s %(levelname)s ' | |
'(%(process)d)] %(message)s'), | |
level=logging.DEBUG) | |
class Server(object): | |
QUEUE_SIGS = [SIGQUIT, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, SIGHUP] | |
EXIT_SIGS = [SIGQUIT, SIGTERM, SIGINT] | |
WORKER_QUEUE_SIGS = list(set(QUEUE_SIGS) - set(EXIT_SIGS)) | |
WORKERS = {} | |
START_CTX = {} | |
def __init__(self): | |
self.SIG_QUEUE = [] | |
# the main point of sig queue is this pair of pipe | |
# using system call pipe(2), for more info see pipe(7) | |
self.pipes = os.pipe() | |
self.worker_cnt = 2 | |
for fd in self.pipes: | |
fcntl(fd, F_SETFL, os.O_NONBLOCK) | |
self._r = os.fdopen(self.pipes[0], 'r') | |
self._w = os.fdopen(self.pipes[1], 'w') | |
self.master_pid = os.getpid() | |
def awaken(self): | |
"""write to w pipe to activate the select""" | |
self._w.write('.') | |
self._w.flush() | |
def spawn_missing_worker(self): | |
for nr in range(self.worker_cnt): | |
if nr in self.WORKERS: | |
continue | |
else: | |
worker = Worker(nr) | |
_pid = os.fork() | |
if _pid: | |
# master | |
self.WORKERS[_pid] = worker | |
else: | |
self.ppid = self.master_pid | |
self.pid = os.getpid() | |
self.worker_loop(worker) | |
sys.exit() | |
def init_worker_process(self, worker): | |
def _exit(sig, frames): | |
logging.info(sig) | |
sys.exit(0) | |
for sig in self.EXIT_SIGS: | |
signal(sig, _exit) | |
for sig in self.WORKER_QUEUE_SIGS: | |
signal(sig, SIG_DFL) | |
signal(SIGCHLD, SIG_DFL) | |
self.SIG_QUEUE.clear() | |
self.WORKERS.clear() | |
self.START_CTX.clear() | |
def worker_loop(self, worker): | |
self.init_worker_process(worker) | |
worker.work() | |
def start(self): | |
def queue_sig(sig, frames): | |
self.SIG_QUEUE.append(sig) | |
self.awaken() | |
for sig in self.QUEUE_SIGS: | |
# handle all sig in this func | |
signal(sig, queue_sig) | |
self.spawn_missing_worker() | |
return self | |
def stop(self, graceful=True): | |
timeout = 10 | |
_start = datetime.now() | |
while len(self.WORKERS) == 0 or\ | |
(datetime.now() - _start).seconds > timeout: | |
if graceful: | |
self.kill_each_worker(SIGQUIT) | |
else: | |
self.kill_each_worker(SIGTERM) | |
sleep(0.1) | |
self.reap_all_workers() | |
self.kill_each_worker(SIGKILL) | |
def join(self): | |
while True: | |
logging.info(self.SIG_QUEUE) | |
if len(self.SIG_QUEUE) > 0: | |
# signal handling | |
sig = self.SIG_QUEUE.pop(0) | |
if sig in [SIGQUIT, SIGINT, SIGTERM]: | |
self.stop() | |
logging.info('quit') | |
return | |
elif sig == SIGHUP: | |
logging.info('hup: reload config') | |
elif sig == SIGUSR1: | |
logging.info('usr1') | |
elif sig == SIGUSR2: | |
logging.info('usr2') | |
else: | |
self.master_sleep() | |
def kill_worker(self, pid, sig): | |
"""murder a worker via send QUIT signal to worker process""" | |
logging.info('killing worker {}'.format(pid)) | |
try: | |
os.kill(pid, sig) | |
except OSError as e: | |
if e.errno == errno.ESRCH: | |
logging.warn('worker pid: {} not found'.format(pid)) | |
elif e.errno in (errno.EPERM, errno.EACCES): | |
logging.warn(('cannot kill worker pid: {} ' | |
'permission denied').format(pid)) | |
def kill_each_worker(self, sig): | |
logging.info(self.WORKERS) | |
for pid in self.WORKERS: | |
self.kill_worker(pid, SIGQUIT) | |
def reap_all_workers(self): | |
while True: | |
try: | |
wpid, status = os.waitpid(-1, os.WNOHANG) | |
if len(wpid) == 0 : | |
return | |
worker = self.WORKERS.pop(wpid) | |
logging.info('reaped worker {}, {}'.format(pid, status)) | |
except errno.ECHLD: | |
break | |
def master_sleep(self): | |
try: | |
# once the r pipe is ready, master will be activated | |
r, _, _ = select([self.pipes[0]], [], [], 5) | |
logging.info(r) | |
if r and r[0] == self.pipes[0]: | |
# we must read it, or it will cause r pipe stay in | |
# ready status for ever ! | |
logging.info(self._r.read(1024)) | |
except InterruptedError as e: | |
# handle awkward EINTR in select, even though we've made | |
# handler to capture SIGINT | |
if e.errno == errno.EINTR: | |
pass | |
else: | |
import traceback | |
traceback.print_exc() | |
class Worker(object): | |
def __init__(self, _id): | |
self.id = _id | |
def work(self): | |
while True: | |
sleep(1) | |
s = Server() | |
s.start().join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment