Created
January 18, 2011 00:51
-
-
Save tmc/783810 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""The :mod:`zmq` module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking | |
""" | |
__zmq__ = __import__('zmq') | |
from gevent.hub import _threadlocal | |
from gevent.socket import wait_read, wait_write | |
__patched__ = ['Context', 'Socket'] | |
globals().update(dict([(var, getattr(__zmq__, var)) | |
for var in __zmq__.__all__ | |
if not (var.startswith('__') | |
or | |
var in __patched__) | |
])) | |
def Context(io_threads=1): | |
"""Factory function replacement for :class:`zmq.core.context.Context` | |
Ensures only be one :class:`_Context` instance per thread. | |
""" | |
try: | |
return _threadlocal.zmq_context | |
except AttributeError: | |
_threadlocal.zmq_context = _Context(io_threads) | |
return _threadlocal.zmq_context | |
class _Context(__zmq__.Context): | |
"""Internal subclass of :class:`zmq.core.context.Context` | |
.. warning:: Do not grab one of these yourself | |
""" | |
def socket(self, socket_type): | |
"""Overridden method to ensure that the green version of socket is used | |
Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures | |
that a :class:`Socket` with all of its send and recv methods set to be | |
non-blocking is returned | |
""" | |
return Socket(self, socket_type) | |
class Socket(__zmq__.Socket): | |
"""Green version of :class:`zmq.core.socket.Socket | |
The following four methods are overridden: | |
* _send_message | |
* _send_copy | |
* _recv_message | |
* _recv_copy | |
To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving | |
is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised | |
""" | |
def _send_message(self, msg, flags=0): | |
flags |= __zmq__.NOBLOCK | |
while True: | |
try: | |
super(Socket, self)._send_message(msg, flags) | |
return | |
except __zmq__.ZMQError, e: | |
if e.errno != EAGAIN: | |
raise | |
wait_write(self.getsockopt(__zmq__.FD)) | |
def _send_copy(self, msg, flags=0): | |
flags |= __zmq__.NOBLOCK | |
while True: | |
try: | |
super(Socket, self)._send_copy(msg, flags) | |
return | |
except __zmq__.ZMQError, e: | |
if e.errno != EAGAIN: | |
raise | |
wait_write(self.getsockopt(__zmq__.FD)) | |
def _recv_message(self, flags=0, track=False): | |
flags |= __zmq__.NOBLOCK | |
while True: | |
try: | |
m = super(Socket, self)._recv_message(flags, track) | |
if m is not None: | |
return m | |
except __zmq__.ZMQError, e: | |
if e.errno != EAGAIN: | |
raise | |
wait_read(self.getsockopt(__zmq__.FD)) | |
def _recv_copy(self, flags=0): | |
flags |= __zmq__.NOBLOCK | |
while True: | |
try: | |
m = super(Socket, self)._recv_copy(flags) | |
if m is not None: | |
return m | |
except __zmq__.ZMQError, e: | |
if e.errno != EAGAIN: | |
raise | |
wait_read(self.getsockopt(__zmq__.FD)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment