Last active
May 24, 2016 15:53
-
-
Save sphaero/c63bc1a4485d5bbbdd9478db4c1de6b0 to your computer and use it in GitHub Desktop.
A Python Asyncore Dispatcher class for ZeroMQ (PyZMQ)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# A Python Asyncore Dispatcher class for ZeroMQ (PyZMQ) | |
# Copyright 2016 Arnaud Loonstra <[email protected]> | |
# | |
# I know Asyncore is sort of deprecated but it still used in many | |
# frameworks. Therefore this class makes it easy to implement | |
# ZeroMQ sockets into those frameworks. | |
# | |
# This Source Code Form is subject to the terms of the Mozilla Public | |
# License, v. 2.0. If a copy of the MPL was not distributed with this | |
# file, You can obtain one at http://mozilla.org/MPL/2.0/. | |
import zmq | |
import asyncore | |
import time | |
class ZmqDispatcher(asyncore.dispatcher): | |
def __init__(self, socket, map=None, *args, **kwargs): | |
asyncore.dispatcher.__init__(self, None, map) | |
self.set_socket(socket) | |
def set_socket(self, sock, map=None): | |
self.socket = sock | |
self._fileno = sock.getsockopt(zmq.FD) | |
self.add_channel(map) | |
def bind(self, *args): | |
self.socket.bind(self, *args) | |
def connect(self, *args): | |
self.socket.connect(*args) | |
self.connected = True | |
def writable(self): | |
return False | |
def send(self, data, *args): | |
self.socket.send(data, *args) | |
def recv(self, *args): | |
return self.socket.recv(*args) | |
def handle_read_event(self): | |
# check if really readable | |
revents = self.socket.getsockopt(zmq.EVENTS) | |
while revents & zmq.POLLIN: | |
self.handle_read() | |
revents = self.socket.getsockopt(zmq.EVENTS) | |
def handle_read(self): | |
print("ERROR: You should overwrite the handle_read method!!!") | |
class ZmqTest(ZmqDispatcher): | |
def handle_read(self): | |
print("RECV:", self.recv()) | |
if __name__ == '__main__': | |
c = zmq.Context() | |
recvr = zmq.Socket(c, zmq.SUB) | |
sendr = zmq.Socket(c, zmq.PUB) | |
sendr.bind("tcp://*:3000") | |
a = ZmqTest(recvr) | |
a.connect("tcp://localhost:3000") | |
recvr.setsockopt(zmq.SUBSCRIBE,b'') | |
while True: | |
starttime = time.time() | |
asyncore.loop(timeout=5, count=1, use_poll=True) | |
if time.time() - starttime > 5: | |
sendr.send(b'bla') | |
print("SEND: b'bla") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment