Created
October 23, 2012 19:49
-
-
Save sporsh/3941124 to your computer and use it in GitHub Desktop.
A Twisted example to demo deferreds, client/server protocols and endpoints
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from twisted.python import failure | |
from twisted.internet import reactor, defer, task | |
from twisted.internet.protocol import Protocol | |
# Define some messages for our protocol | |
MSG_REQUEST = '>' | |
MSG_REQ_ACK = ':' | |
MSG_REQ_SUCCEEDED = '=' | |
MSG_REQ_FAILED = '!' | |
class ID10TRemoteProcedureCallProtocol(Protocol): | |
"""A trivial RPC-like protocol for handling concurrent requests | |
Demonstrates some features of Twisted like reactor, deferreds, endpoints... | |
Request messages start with '>' and a request name followed by arguments | |
>request_name [[arguments]...] | |
Responses are | |
': <request_id>' for acknowledged requests | |
'= <request_id> <result>' results for a request | |
'! <request_id> <error message>' for error message for a request | |
For detailed help on a specific request, send a help request message: | |
>help [request_name] | |
To list requests we have available handlers for: | |
>list | |
""" | |
def __init__(self): | |
self._request_id = 0 | |
def connectionMade(self): | |
print "Connection made" | |
def connectionLost(self, reason): | |
print "Connection lost: %s" % reason.value | |
def dataReceived(self, data): | |
"""Callback called whenever twisted receive data on the transport. | |
""" | |
# We don't know that data contains exactly whole messages, so | |
# we should buffer and defragment messages more properly here... | |
# ... but I'm lazy, so assume that we get whole messages: | |
for message in data.split('\n'): | |
self._dispatch_message(message.strip()) | |
def _dispatch_message(self, message): | |
"""Dispatches messages to correct handler. | |
""" | |
# You'd normally want to implement better error handling here | |
if not message: | |
return | |
try: | |
message_type = message[0] | |
if message_type == MSG_REQUEST: | |
parts = message[1:].split() | |
request, data = parts[0], parts[1:] | |
self._handle_request(request, data) | |
else : | |
print 'Unknown message %r' % message | |
except Exception as e: | |
print 'error:', e, "while dispatching message", repr(message) | |
def _handle_request(self, request_type, data): | |
"""Handle request of a certain type and given data | |
""" | |
self._request_id += 1 | |
# Send a message to the client that we started handling the request | |
message = ' '.join([MSG_REQ_ACK, str(self._request_id)]) | |
self.transport.write(message+'\n') | |
handler = getattr(self, 'handle_'+request_type.strip().upper(), None) | |
if not handler: | |
deferred = defer.fail("Unknown request type %r" % request_type) | |
else: | |
print "Handling request:", request_type, data | |
deferred = defer.maybeDeferred(handler, *data) | |
# Send the result when handling is done | |
deferred.addCallback(self._request_succeeded, self._request_id) | |
deferred.addErrback(self._request_failed, self._request_id) | |
def _request_succeeded(self, result, request_id): | |
"""Sends a result back to the client when the request is done. | |
This method is added as an callback to the deferred of the | |
request handler, so that it is triggered automatically when the result | |
is ready. | |
""" | |
message = ' '.join([MSG_REQ_SUCCEEDED, str(request_id), str(result)]) | |
self.transport.write(message+'\n') | |
def _request_failed(self, failure, request_id): | |
"""Sends a failure response to the client with a error message | |
""" | |
print "[%i] Request failed: %s" % (request_id, failure.value) | |
message = ' '.join([MSG_REQ_FAILED, str(request_id), str(failure.value)]) | |
self.transport.write(message+'\n') | |
# Always handle quit requests by droopping the connection | |
handle_QUIT = lambda self, *_: self.transport.loseConnection() | |
class ID10THelpHandlersMixin(object): | |
"""Handlers for self documenting the protocol | |
""" | |
def handle_HELP(self, name=''): | |
"""Display help text for a request if it is defined | |
>help [request] | |
""" | |
handler = (getattr(self, 'handle_'+name.strip().upper(), None) | |
or ID10TRemoteProcedureCallProtocol) | |
return handler.__doc__ or "No help for %r" % request | |
def handle_LIST(self): | |
"""List all request handlers | |
>list | |
""" | |
requests = [name.strip('handle_').lower() for name in dir(self) | |
if name.startswith('handle_')] | |
return 'Request handlers:\n' + '\n'.join(requests) | |
class ID10TSchedulingHandlersMixin(object): | |
"""Handlers that demonstrate blocking and non-blocing waits in the reactor. | |
""" | |
def handle_SLEEP(self, seconds, result=None): | |
"""Sleep for n seconds, then return a result | |
>sleep seconds [result] | |
""" | |
result = result or 'Slept for %s seconds' % seconds | |
deferred = defer.Deferred() | |
reactor.callLater(float(seconds), deferred.callback, result) | |
return deferred | |
def handle_BLOCK(self, seconds, result=None): | |
"""Block the reactor for n seconds, then return a result | |
>block seconds [result] | |
""" | |
time.sleep(float(seconds)) | |
return defer.succeed(result or 'Blocked for %s seconds' % seconds) | |
class ID10TExtraHandlersMixin(object): | |
"""Mixin with super simple handlers just for fun | |
""" | |
handle_PING = lambda self, *_: 'PONG!' | |
handle_ECHO = lambda self, *a: ' '.join(a) | |
handle_SUM = lambda self, *a: sum(float(i) for i in a) | |
handle_MIN = lambda self, *a: min(*a) | |
if __name__ == '__main__': | |
import sys | |
from twisted.internet import reactor | |
from twisted.internet.endpoints import serverFromString | |
from twisted.internet.protocol import Protocol, ServerFactory | |
# Get port number to listen on from argv, or let twisted decide... | |
port = int(sys.argv[1]) if len(sys.argv)==2 else 0 | |
# Get a Twisted server endpoint object from a description string | |
server = serverFromString(reactor, 'tcp:%i' % port) | |
# Set up the ID10T protocol as we want to use it | |
class ID10TRPCProtocol(ID10TRemoteProcedureCallProtocol, | |
ID10THelpHandlersMixin, | |
ID10TSchedulingHandlersMixin, | |
ID10TExtraHandlersMixin): | |
pass | |
# Start the ID10T server | |
server_factory = ServerFactory() | |
server_factory.protocol = ID10TRPCProtocol | |
server_factory.stopFactory = reactor.stop | |
server_listen_deferred = server.listen(server_factory) | |
@server_listen_deferred.addErrback | |
def server_listen_failed(failure): | |
"""Errback that is triggered if we fail to start the server | |
""" | |
print failure.value | |
reactor.stop() | |
@server_listen_deferred.addCallback | |
def server_listen_callback(twisted_port): | |
"""Callback that is triggered when the server has started listening | |
""" | |
print "Listening on port", twisted_port.getHost().port | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment