Skip to content

Instantly share code, notes, and snippets.

@sphaero
Last active May 24, 2016 15:53
Show Gist options
  • Save sphaero/c63bc1a4485d5bbbdd9478db4c1de6b0 to your computer and use it in GitHub Desktop.
Save sphaero/c63bc1a4485d5bbbdd9478db4c1de6b0 to your computer and use it in GitHub Desktop.
A Python Asyncore Dispatcher class for ZeroMQ (PyZMQ)
#!/usr/bin/python3
#
# A Python Asyncore Dispatcher class for ZeroMQ (PyZMQ)
# Copyright 2016 Arnaud Loonstra <[email protected]>
#
# I know Asyncore is sort of deprecated but it still used in many
# frameworks. Therefore this class makes it easy to implement
# ZeroMQ sockets into those frameworks.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import zmq
import asyncore
import time
class ZmqDispatcher(asyncore.dispatcher):
def __init__(self, socket, map=None, *args, **kwargs):
asyncore.dispatcher.__init__(self, None, map)
self.set_socket(socket)
def set_socket(self, sock, map=None):
self.socket = sock
self._fileno = sock.getsockopt(zmq.FD)
self.add_channel(map)
def bind(self, *args):
self.socket.bind(self, *args)
def connect(self, *args):
self.socket.connect(*args)
self.connected = True
def writable(self):
return False
def send(self, data, *args):
self.socket.send(data, *args)
def recv(self, *args):
return self.socket.recv(*args)
def handle_read_event(self):
# check if really readable
revents = self.socket.getsockopt(zmq.EVENTS)
while revents & zmq.POLLIN:
self.handle_read()
revents = self.socket.getsockopt(zmq.EVENTS)
def handle_read(self):
print("ERROR: You should overwrite the handle_read method!!!")
class ZmqTest(ZmqDispatcher):
def handle_read(self):
print("RECV:", self.recv())
if __name__ == '__main__':
c = zmq.Context()
recvr = zmq.Socket(c, zmq.SUB)
sendr = zmq.Socket(c, zmq.PUB)
sendr.bind("tcp://*:3000")
a = ZmqTest(recvr)
a.connect("tcp://localhost:3000")
recvr.setsockopt(zmq.SUBSCRIBE,b'')
while True:
starttime = time.time()
asyncore.loop(timeout=5, count=1, use_poll=True)
if time.time() - starttime > 5:
sendr.send(b'bla')
print("SEND: b'bla")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment