-
-
Save fabware/3320873 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent | |
from thrift.server.TServer import TServer | |
# XXX Hackish, but should be safe: monkey patch gevent socket support into | |
# Thrift. Overall I think this is cleaner than reimplementing all of TSocket. | |
from thrift.transport import TSocket; TSocket.socket = gevent.socket | |
from thrift.transport.TTransport import TTransportException | |
class TGEventServer(TServer): | |
"""Gevent socket server.""" | |
def serve(self): | |
self.serverTransport.listen() | |
while True: | |
client = self.serverTransport.accept() | |
gevent.spawn(self._process_socket, client) | |
def _process_socket(self, client): | |
"""A greenlet for handling a single client.""" | |
itrans = self.inputTransportFactory.getTransport(client) | |
otrans = self.outputTransportFactory.getTransport(client) | |
iprot = self.inputProtocolFactory.getProtocol(itrans) | |
oprot = self.outputProtocolFactory.getProtocol(otrans) | |
try: | |
while True: | |
self.processor.process(iprot, oprot) | |
except TTransportException, e: | |
pass | |
except Exception, e: | |
pass | |
itrans.close() | |
otrans.close() | |
# see https://issues.apache.org/jira/secure/attachment/12436303/TMultiplexServer.py | |
class TGeventMultiplexServer(TMultiplexServer): | |
def __init__(self, *args): | |
TMultiplexServer.__init__(self, *args); | |
def serve(self): | |
self.serverTransport.listen() | |
while True: | |
client = self.serverTransport.accept() | |
gevent.spawn(self._process_socket, client) | |
def _process_socket(self, client): | |
"""A greenlet for handling a single client.""" | |
inputTransport = self.inputTransportFactory.getTransport(client); | |
outputTransport = self.outputTransportFactory.getTransport(client); | |
inputProtocol = self.inputProtocolFactory.getProtocol(inputTransport); | |
outputProtocol = self.outputProtocolFactory.getProtocol(outputTransport); | |
try: | |
while True: | |
serviceNameMsg = inputProtocol.readMessageBegin(); | |
sname, stype, sseqid = serviceNameMsg; | |
if stype == TMessageType.SERVICE_SELECTION: | |
if sname in self.processors_: | |
processorFactory = self.processors_[sname]; | |
processor = processorFactory.getProcessor(client); | |
processor.process(inputProtocol, outputProtocol); | |
else: | |
outputProtocol.writeMessageBegin(sname, TMessageType.EXCEPTION, sseqid); | |
x = TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid service name: '" + sname + "'"); | |
x.write(outputProtocol); | |
outputProtocol.writeMessageEnd(); | |
outputProtocol.trans.flush(); | |
else: | |
#FIXME: don't know what to expect inside this message. | |
#What to do now? | |
x = TApplicationException(TApplicationException.INVALID_MESSAGE_TYPE, "Expected service selection (" + str(TMessageType.SERVICE_SELECTION) + "), but got: " + str(serviceNameMsg)); | |
outputProtocol.writeMessageBegin(sname, TMessageType.EXCEPTION, sseqid); | |
x.write(outputProtocol); | |
outputProtocol.writeMessageEnd(); | |
outputProtocol.trans.flush(); | |
inputProtocol.readMessageEnd(); | |
except TTransport.TTransportException, tx: | |
pass; | |
except Exception, x: | |
logging.exception(x) | |
inputTransport.close() | |
outputTransport.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment