-
-
Save jdmaturen/853845 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import gevent | |
from gevent.queue import Queue | |
from thrift.server.TServer import TServer | |
from thrift.transport.TTransport import TTransportException | |
class TGEventServer(TServer): | |
"""Gevent socket server.""" | |
def serve(self): | |
self.serverTransport.listen() | |
while True: | |
client = self.serverTransport.accept() | |
gevent.spawn(self._process_socket, client) | |
def _process_socket(self, client): | |
"""A greenlet for handling a single client.""" | |
itrans = self.inputTransportFactory.getTransport(client) | |
otrans = self.outputTransportFactory.getTransport(client) | |
iprot = self.inputProtocolFactory.getProtocol(itrans) | |
oprot = self.outputProtocolFactory.getProtocol(otrans) | |
try: | |
while True: | |
self.processor.process(iprot, oprot) | |
except TTransportException, e: | |
pass | |
except Exception, e: | |
pass | |
itrans.close() | |
otrans.close() | |
class TGEventPoolServer(TServer): | |
"""Gevent socket server with fixed pool of processors""" | |
def __init__(self, *args, **kwargs): | |
TServer.__init__(self, *args) | |
self.clients = Queue() | |
self.greenlets = 10 | |
def setNumGreenlets(self, num): | |
"""Set the number of worker greenlets that should be created""" | |
self.greenlets = num | |
def serveGreenlet(self): | |
"""Loop around getting clients from the shared queue and process them.""" | |
while True: | |
try: | |
client = self.clients.get() | |
self.serveClient(client) | |
except Exception, x: | |
logging.exception(x) | |
def serveClient(self, client): | |
"""Process input/output from a client for as long as possible""" | |
itrans = self.inputTransportFactory.getTransport(client) | |
otrans = self.outputTransportFactory.getTransport(client) | |
iprot = self.inputProtocolFactory.getProtocol(itrans) | |
oprot = self.outputProtocolFactory.getProtocol(otrans) | |
try: | |
while True: | |
self.processor.process(iprot, oprot) | |
except TTransportException, tx: | |
pass | |
except Exception, x: | |
logging.exception(x) | |
itrans.close() | |
otrans.close() | |
def serve(self): | |
"""Start a fixed number of worker greenlets and put client into a queue""" | |
for i in range(self.greenlets): | |
try: | |
g = gevent.spawn(self.serveGreenlet) | |
except Exception, x: | |
logging.exception(x) | |
# Pump the socket for clients | |
self.serverTransport.listen() | |
while True: | |
try: | |
client = self.serverTransport.accept() | |
self.clients.put(client) | |
except Exception, x: | |
logging.exception(x) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment