Skip to content

Instantly share code, notes, and snippets.

@fabware
Forked from imlucas/thrift_gevent.py
Created August 11, 2012 04:25
Show Gist options
  • Save fabware/3320873 to your computer and use it in GitHub Desktop.
Save fabware/3320873 to your computer and use it in GitHub Desktop.
import gevent
from thrift.server.TServer import TServer
# XXX Hackish, but should be safe: monkey patch gevent socket support into
# Thrift. Overall I think this is cleaner than reimplementing all of TSocket.
from thrift.transport import TSocket; TSocket.socket = gevent.socket
from thrift.transport.TTransport import TTransportException
class TGEventServer(TServer):
"""Gevent socket server."""
def serve(self):
self.serverTransport.listen()
while True:
client = self.serverTransport.accept()
gevent.spawn(self._process_socket, client)
def _process_socket(self, client):
"""A greenlet for handling a single client."""
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransportException, e:
pass
except Exception, e:
pass
itrans.close()
otrans.close()
# see https://issues.apache.org/jira/secure/attachment/12436303/TMultiplexServer.py
class TGeventMultiplexServer(TMultiplexServer):
def __init__(self, *args):
TMultiplexServer.__init__(self, *args);
def serve(self):
self.serverTransport.listen()
while True:
client = self.serverTransport.accept()
gevent.spawn(self._process_socket, client)
def _process_socket(self, client):
"""A greenlet for handling a single client."""
inputTransport = self.inputTransportFactory.getTransport(client);
outputTransport = self.outputTransportFactory.getTransport(client);
inputProtocol = self.inputProtocolFactory.getProtocol(inputTransport);
outputProtocol = self.outputProtocolFactory.getProtocol(outputTransport);
try:
while True:
serviceNameMsg = inputProtocol.readMessageBegin();
sname, stype, sseqid = serviceNameMsg;
if stype == TMessageType.SERVICE_SELECTION:
if sname in self.processors_:
processorFactory = self.processors_[sname];
processor = processorFactory.getProcessor(client);
processor.process(inputProtocol, outputProtocol);
else:
outputProtocol.writeMessageBegin(sname, TMessageType.EXCEPTION, sseqid);
x = TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid service name: '" + sname + "'");
x.write(outputProtocol);
outputProtocol.writeMessageEnd();
outputProtocol.trans.flush();
else:
#FIXME: don't know what to expect inside this message.
#What to do now?
x = TApplicationException(TApplicationException.INVALID_MESSAGE_TYPE, "Expected service selection (" + str(TMessageType.SERVICE_SELECTION) + "), but got: " + str(serviceNameMsg));
outputProtocol.writeMessageBegin(sname, TMessageType.EXCEPTION, sseqid);
x.write(outputProtocol);
outputProtocol.writeMessageEnd();
outputProtocol.trans.flush();
inputProtocol.readMessageEnd();
except TTransport.TTransportException, tx:
pass;
except Exception, x:
logging.exception(x)
inputTransport.close()
outputTransport.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment