Skip to content

Instantly share code, notes, and snippets.

@lxyu
Last active August 20, 2018 06:10
Show Gist options
  • Save lxyu/4406686 to your computer and use it in GitHub Desktop.
Save lxyu/4406686 to your computer and use it in GitHub Desktop.
Better TProcessPoolServer for thrift, support process timeout and get active process count stats.
# -*- coding: utf-8 -*-
import logging
import multiprocessing
import signal
import time
from thrift.transport import TTransport
from thrift.server import TServer
def timeout_handler(signum, frame):
raise TServerTimeout
class TServerTimeout(Exception):
pass
class TProcessPoolServer(TServer.TServer):
"""
Refine TProcessPoolServer with more functions.
1. support process timeout. will kill server process when timeout reached.
2. can get active process count by active_count property.
"""
def __init__(self, *args, **kwargs):
TServer.TServer.__init__(self, *args)
self._active_count = multiprocessing.Value('d', 0)
self.pool_size = kwargs.get('pool_size', 10)
self.timeout = kwargs.get('timeout', 30)
self.workers = []
def _handle(self):
signal.signal(signal.SIGALRM, timeout_handler)
while True:
try:
client = self.serverTransport.accept()
self._serve_client(client)
except (SystemExit, KeyboardInterrupt):
break
except Exception as x:
logging.exception(x)
def _serve_client(self, client):
try:
self._active_count.value += 1
signal.alarm(self.timeout)
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransport.TTransportException:
pass
except TServerTimeout:
logging.error("Timed out!")
finally:
itrans.close()
otrans.close()
self._active_count.value -= 1
signal.alarm(0)
@property
def active_count(self):
return self._active_count.value
def serve(self):
self.serverTransport.listen()
for i in range(self.pool_size):
p = multiprocessing.Process(target=self._handle)
p.daemon = True
p.start()
self.workers.append(p)
while True:
try:
if self._active_count.value >= self.pool_size * 0.8:
logging.warn(
"Currently using %d workers, pool seems busy, "
"you may need to increase pool size!" %
self._active_count.value)
time.sleep(1)
except (SystemExit, KeyboardInterrupt):
break
except Exception as e:
logging.exception(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment