Last active
January 19, 2018 08:17
-
-
Save xiazhibin/11db1f2fd110be65767192baa25a7b02 to your computer and use it in GitHub Desktop.
pre-fork model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # coding=utf-8 | |
| import os, logging | |
| import random | |
| import signal | |
| import time | |
| import sys | |
| import errno | |
| file_name = "tmp.log" | |
| h = logging.FileHandler(file_name) | |
| h.setFormatter(logging.Formatter("%(levelname)s %(message)s")) | |
| logger = logging.getLogger() | |
| logger.setLevel(logging.INFO) | |
| logger.addHandler(h) | |
| logger = logging.getLogger(__name__) | |
| class Worker(object): | |
| def __init__(self, ppid): | |
| self.ppid = ppid | |
| def run(self): | |
| while True: | |
| time.sleep(1) | |
| class Master(object): | |
| WORKERS = {} | |
| SIG_QUEUE = [] | |
| SIGNALS = [getattr(signal, "SIG%s" % x) | |
| for x in "INT TTIN TTOU".split()] | |
| SIG_NAMES = dict( | |
| (getattr(signal, name), name[3:].lower()) for name in dir(signal) | |
| if name[:3] == "SIG" and name[3] != "_" | |
| ) | |
| def __init__(self, worker_nums=2): | |
| self.worker_nums = worker_nums | |
| self.master_name = 'Master' | |
| self.reexec_pid = 0 | |
| self.pid = -1 | |
| self.start() | |
| def start(self): | |
| self.pid = os.getpid() | |
| logger.info('master {} start'.format(self.pid)) | |
| self.init_signals() | |
| def init_signals(self): | |
| map(lambda s: signal.signal(s, self.signal), self.SIGNALS) | |
| signal.signal(signal.SIGCHLD, self.handle_chld) | |
| def signal(self, sig, frame): | |
| if len(self.SIG_QUEUE) < 5: | |
| self.SIG_QUEUE.append(sig) | |
| def handle_chld(self, sig, frame): | |
| logger.info('get a chld') | |
| self.reap_workers() | |
| def handle_int(self): | |
| self.stop() | |
| raise StopIteration | |
| def handle_ttin(self): | |
| logger.info('add a worker') | |
| self.worker_nums += 1 | |
| self.manage_workers() | |
| def handle_ttou(self): | |
| logger.info("deincrease a worker") | |
| if self.worker_nums > 0: | |
| self.worker_nums -= 1 | |
| self.manage_workers() | |
| def run(self): | |
| self.manage_workers() | |
| while True: | |
| try: | |
| self.reap_workers() | |
| sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) > 0 else None | |
| if sig is None: | |
| time.sleep(1) | |
| self.manage_workers() | |
| continue | |
| if sig not in self.SIG_NAMES: | |
| logger.info('Ignoring unknown signal: {}'.format(sig)) | |
| continue | |
| sigal_name = self.SIG_NAMES.get(sig) | |
| handler = getattr(self, "handle_%s" % sigal_name, None) | |
| if handler is None: | |
| logger.error("Unhandled signal: %s" % sigal_name) | |
| continue | |
| logger.info("Handling signal: %s" % sigal_name) | |
| handler() | |
| except StopIteration: | |
| break | |
| except KeyboardInterrupt: | |
| self.stop(False) | |
| sys.exit(-1) | |
| except Exception: | |
| logger.exception("Unhandled exception in main loop.") | |
| self.stop(False) | |
| sys.exit(-1) | |
| def stop(self, is_gracefule=True): | |
| logger.info('stop workers') | |
| sig = signal.SIGQUIT | |
| if not is_gracefule: | |
| sig = signal.SIGTERM | |
| self.kill_workers(sig) | |
| time.sleep(0.1) | |
| self.reap_workers() | |
| self.kill_workers(signal.SIGKILL) | |
| def halt(self, exit_status=0): | |
| ''' | |
| master kill itself | |
| ''' | |
| logger.info("master exit") | |
| self.stop() | |
| sys.exit(exit_status) | |
| def reap_workers(self): | |
| ''' | |
| 这里的检测也是为了避免僵尸进程,否则大量资源无法释放 | |
| 参考:http://www.cnblogs.com/mickole/p/3187770.html | |
| ''' | |
| try: | |
| while True: | |
| # os.waitpid 收集僵尸子进程的信息,并把它彻底销毁后返回 | |
| # 这里的 -1 代表所有子进程 | |
| # os.WNOHANG 如果没有子进程信息就立刻返回 | |
| wpid, status = os.waitpid(-1, os.WNOHANG) | |
| if wpid is None: | |
| break | |
| else: | |
| logger.info("reap_workers:{0}".format(wpid)) | |
| worker = self.WORKERS.pop(wpid, None) | |
| if worker is None: | |
| continue | |
| except OSError as e: | |
| # errno.ECHILD 是没有子进程错误 | |
| if e.errno == errno.ECHILD: | |
| pass | |
| def manage_workers(self): | |
| ''' | |
| workers 的健康检查,数量是否对齐等 | |
| ''' | |
| if len(self.WORKERS.keys()) < self.worker_nums: | |
| self.spawn_workers() | |
| workers = self.WORKERS.items() | |
| while len(workers) > self.worker_nums: | |
| (pid, _) = workers.pop(0) | |
| self.kill_worker(pid, signal.SIGTERM) | |
| def spawn_worker(self): | |
| worker = Worker(self.pid) | |
| pid = os.fork() | |
| # master进程处理 | |
| if pid != 0: | |
| self.WORKERS[pid] = worker | |
| return pid | |
| # worker进程处理 | |
| worker_pid = os.getpid() | |
| try: | |
| logger.info("Worker %s booting" % worker_pid) | |
| worker.run() | |
| sys.exit(0) | |
| except SystemExit: | |
| raise | |
| except Exception as e: | |
| logger.info("work error %s" % str(e)) | |
| sys.exit(-1) | |
| def spawn_workers(self): | |
| for i in range(self.worker_nums - len(self.WORKERS.keys())): | |
| self.spawn_worker() | |
| def kill_workers(self, sig): | |
| for pid in self.WORKERS.keys(): | |
| self.kill_worker(pid, sig) | |
| def kill_worker(self, pid, sig): | |
| try: | |
| os.kill(pid, sig) | |
| kpid, stat = os.waitpid(pid, os.WNOHANG) | |
| if kpid: | |
| logger.warning("Problem killing process: %s" % pid) | |
| except OSError as e: | |
| logger.warning("kill worker error: %s" % str(e)) | |
| if __name__ == "__main__": | |
| Master().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment