Skip to content

Instantly share code, notes, and snippets.

@sp3c73r2038
Created February 10, 2014 09:03
Show Gist options
  • Save sp3c73r2038/8912632 to your computer and use it in GitHub Desktop.
Save sp3c73r2038/8912632 to your computer and use it in GitHub Desktop.
forking worker process server, handling signals with a queue. idea stolen from unicorn.
#!/usr/bin/env python
from datetime import datetime
import errno
from fcntl import fcntl, F_SETFL
import logging
import os
from select import select
from signal import signal, SIGQUIT, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, SIGHUP
from signal import SIGCHLD, SIG_DFL, SIGKILL
import sys
from time import sleep
logging.basicConfig(format=('[%(asctime)s %(levelname)s '
'(%(process)d)] %(message)s'),
level=logging.DEBUG)
class Server(object):
QUEUE_SIGS = [SIGQUIT, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, SIGHUP]
EXIT_SIGS = [SIGQUIT, SIGTERM, SIGINT]
WORKER_QUEUE_SIGS = list(set(QUEUE_SIGS) - set(EXIT_SIGS))
WORKERS = {}
START_CTX = {}
def __init__(self):
self.SIG_QUEUE = []
# the main point of sig queue is this pair of pipe
# using system call pipe(2), for more info see pipe(7)
self.pipes = os.pipe()
self.worker_cnt = 2
for fd in self.pipes:
fcntl(fd, F_SETFL, os.O_NONBLOCK)
self._r = os.fdopen(self.pipes[0], 'r')
self._w = os.fdopen(self.pipes[1], 'w')
self.master_pid = os.getpid()
def awaken(self):
"""write to w pipe to activate the select"""
self._w.write('.')
self._w.flush()
def spawn_missing_worker(self):
for nr in range(self.worker_cnt):
if nr in self.WORKERS:
continue
else:
worker = Worker(nr)
_pid = os.fork()
if _pid:
# master
self.WORKERS[_pid] = worker
else:
self.ppid = self.master_pid
self.pid = os.getpid()
self.worker_loop(worker)
sys.exit()
def init_worker_process(self, worker):
def _exit(sig, frames):
logging.info(sig)
sys.exit(0)
for sig in self.EXIT_SIGS:
signal(sig, _exit)
for sig in self.WORKER_QUEUE_SIGS:
signal(sig, SIG_DFL)
signal(SIGCHLD, SIG_DFL)
self.SIG_QUEUE.clear()
self.WORKERS.clear()
self.START_CTX.clear()
def worker_loop(self, worker):
self.init_worker_process(worker)
worker.work()
def start(self):
def queue_sig(sig, frames):
self.SIG_QUEUE.append(sig)
self.awaken()
for sig in self.QUEUE_SIGS:
# handle all sig in this func
signal(sig, queue_sig)
self.spawn_missing_worker()
return self
def stop(self, graceful=True):
timeout = 10
_start = datetime.now()
while len(self.WORKERS) == 0 or\
(datetime.now() - _start).seconds > timeout:
if graceful:
self.kill_each_worker(SIGQUIT)
else:
self.kill_each_worker(SIGTERM)
sleep(0.1)
self.reap_all_workers()
self.kill_each_worker(SIGKILL)
def join(self):
while True:
logging.info(self.SIG_QUEUE)
if len(self.SIG_QUEUE) > 0:
# signal handling
sig = self.SIG_QUEUE.pop(0)
if sig in [SIGQUIT, SIGINT, SIGTERM]:
self.stop()
logging.info('quit')
return
elif sig == SIGHUP:
logging.info('hup: reload config')
elif sig == SIGUSR1:
logging.info('usr1')
elif sig == SIGUSR2:
logging.info('usr2')
else:
self.master_sleep()
def kill_worker(self, pid, sig):
"""murder a worker via send QUIT signal to worker process"""
logging.info('killing worker {}'.format(pid))
try:
os.kill(pid, sig)
except OSError as e:
if e.errno == errno.ESRCH:
logging.warn('worker pid: {} not found'.format(pid))
elif e.errno in (errno.EPERM, errno.EACCES):
logging.warn(('cannot kill worker pid: {} '
'permission denied').format(pid))
def kill_each_worker(self, sig):
logging.info(self.WORKERS)
for pid in self.WORKERS:
self.kill_worker(pid, SIGQUIT)
def reap_all_workers(self):
while True:
try:
wpid, status = os.waitpid(-1, os.WNOHANG)
if len(wpid) == 0 :
return
worker = self.WORKERS.pop(wpid)
logging.info('reaped worker {}, {}'.format(pid, status))
except errno.ECHLD:
break
def master_sleep(self):
try:
# once the r pipe is ready, master will be activated
r, _, _ = select([self.pipes[0]], [], [], 5)
logging.info(r)
if r and r[0] == self.pipes[0]:
# we must read it, or it will cause r pipe stay in
# ready status for ever !
logging.info(self._r.read(1024))
except InterruptedError as e:
# handle awkward EINTR in select, even though we've made
# handler to capture SIGINT
if e.errno == errno.EINTR:
pass
else:
import traceback
traceback.print_exc()
class Worker(object):
def __init__(self, _id):
self.id = _id
def work(self):
while True:
sleep(1)
s = Server()
s.start().join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment