Created
June 26, 2012 10:04
-
-
Save commandodev/2994781 to your computer and use it in GitHub Desktop.
Handling REQ/REP syncronization in eventlet
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from eventlet.pools import Pool | |
from eventlet.timeout import Timeout | |
class SocketPool(Pool): | |
"""A pool of sockets connected to a component | |
If a socket times out in use, simply close if before handing it back to the | |
pool and it will be discarded and a replacement inserted into the pool.""" | |
def __init__(self, address, **kwargs): | |
self.address = address | |
super(SocketPool, self).__init__(**kwargs) | |
def __str__(self): | |
return self.address | |
def __repr__(self): | |
return "<SocketPool: %s>" % self | |
def create(self): | |
socket = zmq.Context().socket(zmq.REQ) | |
socket.connect(self.address) | |
socket.setsockopt(zmq.LINGER, 0) | |
return socket | |
def put(self, socket): | |
"""Wrapper around superclass put, replacing socket if closed""" | |
if socket.closed: | |
socket = self.create() | |
super(SocketPool, self).put(socket) | |
def do_req_rep(destination_pool, message): | |
with destination_pool.item() as socket: | |
try: | |
with Timeout(): | |
socket.send(message) | |
return socket.recv() | |
except TimeoutException: | |
socket.close() | |
As mentioned in the article, timeouts work. The problem is that they are timeouts. For many cases the signalling from TCP would yield results much earlier.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I should also have imported the greened version of zmq for this to work (from eventlet.green import zmq)