Last active
August 20, 2018 06:10
-
-
Save lxyu/4406686 to your computer and use it in GitHub Desktop.
Better TProcessPoolServer for thrift, support process timeout and get active process count stats.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import logging | |
import multiprocessing | |
import signal | |
import time | |
from thrift.transport import TTransport | |
from thrift.server import TServer | |
def timeout_handler(signum, frame): | |
raise TServerTimeout | |
class TServerTimeout(Exception): | |
pass | |
class TProcessPoolServer(TServer.TServer): | |
""" | |
Refine TProcessPoolServer with more functions. | |
1. support process timeout. will kill server process when timeout reached. | |
2. can get active process count by active_count property. | |
""" | |
def __init__(self, *args, **kwargs): | |
TServer.TServer.__init__(self, *args) | |
self._active_count = multiprocessing.Value('d', 0) | |
self.pool_size = kwargs.get('pool_size', 10) | |
self.timeout = kwargs.get('timeout', 30) | |
self.workers = [] | |
def _handle(self): | |
signal.signal(signal.SIGALRM, timeout_handler) | |
while True: | |
try: | |
client = self.serverTransport.accept() | |
self._serve_client(client) | |
except (SystemExit, KeyboardInterrupt): | |
break | |
except Exception as x: | |
logging.exception(x) | |
def _serve_client(self, client): | |
try: | |
self._active_count.value += 1 | |
signal.alarm(self.timeout) | |
itrans = self.inputTransportFactory.getTransport(client) | |
otrans = self.outputTransportFactory.getTransport(client) | |
iprot = self.inputProtocolFactory.getProtocol(itrans) | |
oprot = self.outputProtocolFactory.getProtocol(otrans) | |
try: | |
while True: | |
self.processor.process(iprot, oprot) | |
except TTransport.TTransportException: | |
pass | |
except TServerTimeout: | |
logging.error("Timed out!") | |
finally: | |
itrans.close() | |
otrans.close() | |
self._active_count.value -= 1 | |
signal.alarm(0) | |
@property | |
def active_count(self): | |
return self._active_count.value | |
def serve(self): | |
self.serverTransport.listen() | |
for i in range(self.pool_size): | |
p = multiprocessing.Process(target=self._handle) | |
p.daemon = True | |
p.start() | |
self.workers.append(p) | |
while True: | |
try: | |
if self._active_count.value >= self.pool_size * 0.8: | |
logging.warn( | |
"Currently using %d workers, pool seems busy, " | |
"you may need to increase pool size!" % | |
self._active_count.value) | |
time.sleep(1) | |
except (SystemExit, KeyboardInterrupt): | |
break | |
except Exception as e: | |
logging.exception(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment