Last active
August 29, 2015 14:05
-
-
Save mengzhuo/01c092b56b609d051388 to your computer and use it in GitHub Desktop.
Gate.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
import signal | |
import logging | |
logger = logging.getLogger(__name__) | |
import sys | |
import os | |
import multiprocessing | |
import gevent | |
from gevent import Greenlet, Timeout, socket | |
from gevent.server import StreamServer | |
def handle(sock, addr): | |
sock.send('Hello World\n') | |
while not sock.closed: | |
a = sock.recv(1) | |
logger.debug('pid:%d handle %s' % (os.getpid(), a)) | |
sock.close() | |
class Gate(StreamServer): | |
def __init__(self, *args, **kwargs): | |
super(Gate, self).__init__(*args, **kwargs) | |
self.process_number = max(int(os.getenv('GATE_PROCESS_NUMBER') or | |
multiprocessing.cpu_count()) -1, | |
1) | |
self.master = False | |
self._main_running = False | |
self.children = {} | |
def spawn_process(self): | |
pid = gevent.fork() | |
if pid: | |
# master | |
self.stop_accepting() | |
self.master = True | |
self.children[pid] = {'connections':0} | |
else: | |
# re-connect to signal | |
signal.signal(signal.SIGTERM, self.child_stop) | |
signal.signal(signal.SIGHUP, self.child_stop) | |
self._stop_event.clear() | |
self.master = False | |
try: | |
self.start_accepting() | |
except: | |
self.close() | |
raise | |
# self.ctlsock = child_sock | |
try: | |
self._stop_event.wait() | |
finally: | |
Greenlet.spawn(self.stop, timeout=None).join() | |
sys.exit(0) | |
def child_stop(self, *args, **kwargs): | |
try: | |
self.stop() | |
sys.exit(0) | |
except Exception as err: | |
logger.error(err, exc_info=True) | |
sys.exit(1) | |
def serve_forever(self): | |
if self._main_running: | |
return | |
self.init_socket() | |
self._main_running = True | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
for _ in xrange(self.process_number): | |
self.spawn_process() | |
logger.debug('Init ps:%s' % self.children.keys()) | |
try: | |
while self._main_running: | |
pid, _ = os.wait() | |
del self.children[pid] | |
self.spawn_process() | |
logger.debug("Current ps:%s" % self.children.keys()) | |
except KeyboardInterrupt: | |
self.stop() | |
finally: | |
sys.exit(0) | |
def stop(self, *args, **kwargs): | |
if self.master: | |
self._main_running = False | |
try: | |
with Timeout(10, OSError): | |
for child in self.children.keys(): | |
os.kill(child, signal.SIGTERM) | |
except OSError: | |
pass | |
except Exception as err: | |
logger.error(err, exc_info=True) | |
finally: | |
super(Gate, self).stop(*args, **kwargs) | |
else: | |
try: | |
logger.debug("%s stopping...." % os.getpid()) | |
super(Gate, self).stop(*args, **kwargs) | |
logger.debug('%s stopped' % os.getpid()) | |
except Exception as err: | |
logger.error(err, exc_info=True) | |
finally: | |
return | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.ERROR) | |
gate = Gate(('0.0.0.0', 8080), handle=handle) | |
gate.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment