Skip to content

Instantly share code, notes, and snippets.

@xiazhibin
Last active January 19, 2018 08:17
Show Gist options
  • Save xiazhibin/11db1f2fd110be65767192baa25a7b02 to your computer and use it in GitHub Desktop.
Save xiazhibin/11db1f2fd110be65767192baa25a7b02 to your computer and use it in GitHub Desktop.
pre-fork model
# coding=utf-8
import os, logging
import random
import signal
import time
import sys
import errno
file_name = "tmp.log"
h = logging.FileHandler(file_name)
h.setFormatter(logging.Formatter("%(levelname)s %(message)s"))
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(h)
logger = logging.getLogger(__name__)
class Worker(object):
def __init__(self, ppid):
self.ppid = ppid
def run(self):
while True:
time.sleep(1)
class Master(object):
WORKERS = {}
SIG_QUEUE = []
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "INT TTIN TTOU".split()]
SIG_NAMES = dict(
(getattr(signal, name), name[3:].lower()) for name in dir(signal)
if name[:3] == "SIG" and name[3] != "_"
)
def __init__(self, worker_nums=2):
self.worker_nums = worker_nums
self.master_name = 'Master'
self.reexec_pid = 0
self.pid = -1
self.start()
def start(self):
self.pid = os.getpid()
logger.info('master {} start'.format(self.pid))
self.init_signals()
def init_signals(self):
map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
signal.signal(signal.SIGCHLD, self.handle_chld)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
def handle_chld(self, sig, frame):
logger.info('get a chld')
self.reap_workers()
def handle_int(self):
self.stop()
raise StopIteration
def handle_ttin(self):
logger.info('add a worker')
self.worker_nums += 1
self.manage_workers()
def handle_ttou(self):
logger.info("deincrease a worker")
if self.worker_nums > 0:
self.worker_nums -= 1
self.manage_workers()
def run(self):
self.manage_workers()
while True:
try:
self.reap_workers()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) > 0 else None
if sig is None:
time.sleep(1)
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
logger.info('Ignoring unknown signal: {}'.format(sig))
continue
sigal_name = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % sigal_name, None)
if handler is None:
logger.error("Unhandled signal: %s" % sigal_name)
continue
logger.info("Handling signal: %s" % sigal_name)
handler()
except StopIteration:
break
except KeyboardInterrupt:
self.stop(False)
sys.exit(-1)
except Exception:
logger.exception("Unhandled exception in main loop.")
self.stop(False)
sys.exit(-1)
def stop(self, is_gracefule=True):
logger.info('stop workers')
sig = signal.SIGQUIT
if not is_gracefule:
sig = signal.SIGTERM
self.kill_workers(sig)
time.sleep(0.1)
self.reap_workers()
self.kill_workers(signal.SIGKILL)
def halt(self, exit_status=0):
'''
master kill itself
'''
logger.info("master exit")
self.stop()
sys.exit(exit_status)
def reap_workers(self):
'''
这里的检测也是为了避免僵尸进程,否则大量资源无法释放
参考:http://www.cnblogs.com/mickole/p/3187770.html
'''
try:
while True:
# os.waitpid 收集僵尸子进程的信息,并把它彻底销毁后返回
# 这里的 -1 代表所有子进程
# os.WNOHANG 如果没有子进程信息就立刻返回
wpid, status = os.waitpid(-1, os.WNOHANG)
if wpid is None:
break
else:
logger.info("reap_workers:{0}".format(wpid))
worker = self.WORKERS.pop(wpid, None)
if worker is None:
continue
except OSError as e:
# errno.ECHILD 是没有子进程错误
if e.errno == errno.ECHILD:
pass
def manage_workers(self):
'''
workers 的健康检查,数量是否对齐等
'''
if len(self.WORKERS.keys()) < self.worker_nums:
self.spawn_workers()
workers = self.WORKERS.items()
while len(workers) > self.worker_nums:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
def spawn_worker(self):
worker = Worker(self.pid)
pid = os.fork()
# master进程处理
if pid != 0:
self.WORKERS[pid] = worker
return pid
# worker进程处理
worker_pid = os.getpid()
try:
logger.info("Worker %s booting" % worker_pid)
worker.run()
sys.exit(0)
except SystemExit:
raise
except Exception as e:
logger.info("work error %s" % str(e))
sys.exit(-1)
def spawn_workers(self):
for i in range(self.worker_nums - len(self.WORKERS.keys())):
self.spawn_worker()
def kill_workers(self, sig):
for pid in self.WORKERS.keys():
self.kill_worker(pid, sig)
def kill_worker(self, pid, sig):
try:
os.kill(pid, sig)
kpid, stat = os.waitpid(pid, os.WNOHANG)
if kpid:
logger.warning("Problem killing process: %s" % pid)
except OSError as e:
logger.warning("kill worker error: %s" % str(e))
if __name__ == "__main__":
Master().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment