Created
April 16, 2014 02:51
-
-
Save svvitale/e5bb806a754184455dc4 to your computer and use it in GitHub Desktop.
Mixin enabling any python class to be a ZMQ inproc listener
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class _Singleton(type): | |
"""Singleton class from Stack Overflow: | |
http://stackoverflow.com/a/6798042 | |
""" | |
_instances = {} | |
def __call__(cls, *args, **kwargs): | |
if cls not in cls._instances: | |
cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs) | |
return cls._instances[cls] | |
class Singleton(_Singleton('SingletonMeta', (object,), {})): pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest2 | |
import zmqextended | |
import zmq | |
import time | |
import threading | |
class InprocListenerMixinTest(unittest2.TestCase): | |
def test_class_method(self): | |
"""Test that the InprocListenerMixin has correctly implemented the inproc_addr class method. | |
""" | |
self.assertEquals(zmqextended.InprocListenerMixin.default_inproc_addr(), "inproc://InprocListenerMixin") | |
def test_mixin(self): | |
"""Test that we can create a class that listens just by deriving from the InprocListenerMixin. | |
""" | |
class INeedToListen(zmqextended.InprocListenerMixin, object): | |
def __init__(self): | |
super(INeedToListen, self).__init__(self.handler) | |
self.handled_requests = 0 | |
def handler(self, options): | |
self.handled_requests += 1 | |
return options | |
# Create an instance of our local class. It will start listening immediately. | |
test_instance = INeedToListen() | |
# Test to make sure we got a correct inproc address | |
self.assertTrue(test_instance.inproc_addr().startswith("inproc://INeedToListen")) | |
self.assertNotEqual(test_instance.inproc_addr(), "inproc://INeedToListen") | |
# Send a message to our local instance | |
test_zmqsend = zmqextended.ZMQSender(test_instance.inproc_addr()) | |
test_zmqsend.send_sync({"expected": 1}) | |
# Verify that the message was handled. | |
self.assertEqual(test_instance.handled_requests, 1) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import zmq | |
import threading | |
from singleton import Singleton | |
class ZMQContext(zmq.Context, Singleton): | |
"""Singleton implementation of a ZMQ Context object. | |
""" | |
def __init__(self): | |
super(ZMQContext, self).__init__() | |
class ZMQThread(threading.Thread): | |
"""Abstract base class for ZMQ listener/sender threads. Handles management of the stop event, and enforces | |
start()/run() method overrides.""" | |
DEFAULT_POLLING_INTERVAL_MS = 100 | |
def __init__(self, addr): | |
# Call the base threading.Thread constructor and set up this thread as a daemon thread (so it will terminate | |
# on program exit). | |
super(ZMQThread, self).__init__() | |
self.daemon = True | |
# Set up a cancellation event | |
self._stop_event = threading.Event() | |
# Save off the address | |
self._addr = addr | |
# Initialize request/reply members | |
self._request = self._reply = None | |
self._handler = None | |
def start(self): | |
# Enforce that we have a run method defined in our child class. | |
if self.run.__func__ is ZMQThread.run.__func__: | |
raise NotImplementedError("%s requires a run() method" % (self.__class__.__name__,)) | |
# Call the parent's method to start the thread. | |
super(ZMQThread, self).start() | |
def run(self): | |
raise NotImplementedError("%s requires a run() method" % (self.__class__.__name__,)) | |
def stop(self): | |
# Set the cancellation event. | |
self._stop_event.set() | |
class ZMQListener(ZMQThread): | |
"""Convenience class for receiving python objects on a ZMQ endpoint.""" | |
def __init__(self, addr, handler): | |
"""Construct a ZMQ listener. | |
addr -- ZMQ endpoint address on which we will be listening. | |
handler -- Callback to be invoked when a request is received. Must take one argument which | |
is the request as a python object. The handler's return value will be forwarded back to the sender. | |
""" | |
super(ZMQListener, self).__init__(addr) | |
# Make sure the caller specified a callable handler (function or method). | |
if not hasattr(handler, "__call__"): | |
raise ValueError("handler argument must be callable!") | |
self._handler = handler | |
def run(self): | |
"""Thread worker.""" | |
# Instantiate the dealer socket | |
zmq_socket = ZMQContext().socket(zmq.DEALER) | |
try: | |
# Start listening on the specified address | |
zmq_socket.bind(self._addr) | |
# Continue polling until we've been told to stop | |
while not self._stop_event.is_set(): | |
if zmq_socket.poll(ZMQThread.DEFAULT_POLLING_INTERVAL_MS) != 0: | |
# Ready to receive a packet | |
request = zmq_socket.recv_pyobj() | |
# Successfully got a message, call the handler | |
reply = self._handler(request) | |
# Send the response | |
zmq_socket.send_pyobj(reply) | |
finally: | |
# Make sure to close the socket no matter what | |
zmq_socket.close() | |
class ZMQSender(ZMQThread): | |
"""Convenience class for sending a python object to a ZMQ endpoint. Supports both synchronous/asynchronous sends.""" | |
def send_sync(self, request): | |
"""Send a request synchronously. This method will block until a response is received. | |
request -- Any valid python object (serializable by pickle) that you wish to send. | |
returns -- Response as a python object. | |
""" | |
self._handler = None | |
self._request = request | |
self.run() | |
return self._reply | |
def send_async(self, request, handler): | |
"""Send a request asynchronously. | |
request -- Any valid python object (serializable by pickle) that you wish to send. | |
handler -- Callback to be invoked when a response is received. Must take one argument which | |
is the response as a python object. | |
""" | |
# Make sure the caller specified a callable handler (function or method). | |
if not hasattr(handler, "__call__"): | |
raise ValueError("Handler argument must be callable!") | |
self._handler = handler | |
self._request = request | |
self.start() | |
def run(self): | |
"""Thread worker.""" | |
# Instantiate the dealer socket | |
zmq_socket = ZMQContext().socket(zmq.DEALER) | |
try: | |
zmq_socket.connect(self._addr) | |
# Send the request | |
zmq_socket.send_pyobj(self._request) | |
# Continue polling until we receive a response or we've been told to stop | |
while not self._stop_event.is_set(): | |
if zmq_socket.poll(ZMQThread.DEFAULT_POLLING_INTERVAL_MS) != 0: | |
# Ready to receive a packet | |
self._reply = zmq_socket.recv_pyobj() | |
# Successfully got a message, call the handler and return | |
if self._handler is not None: | |
self._handler(self._reply) | |
return | |
finally: | |
# Make sure to close the socket no matter what | |
zmq_socket.close() | |
class InprocListenerMixin: | |
"""Derive from this mixin to add inproc ZMQ message handling to any python object.""" | |
@classmethod | |
def default_inproc_addr(cls): | |
# Generate an inproc address based on our class name. | |
return "inproc://" + cls.__name__ | |
def __init__(self, handler): | |
"""Initializes the ZMQ listener on the inproc socket, and assigns the message handler. | |
handler -- a callable taking one parameter which is the Python object being received. | |
The handler's return value will be forwarded on to the sender as the message response. | |
""" | |
self._inproc_addr = self.default_inproc_addr() + "-" + os.urandom(16) | |
self._listener = ZMQListener(self._inproc_addr, handler) | |
self._listener.start() | |
def inproc_addr(self): | |
return self._inproc_addr | |
def __del__(self): | |
"""Stop and join the listener thread if it's still running.""" | |
if self._listener.is_alive(): | |
self._listener.stop() | |
self._listener.join(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment