Created
June 11, 2012 20:22
-
-
Save lebedov/2912423 to your computer and use it in GitHub Desktop.
Use eventloops to send and receive data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Use eventloops to send and receive data. | |
""" | |
import logging | |
import zmq | |
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback | |
from zmq.eventloop.zmqstream import ZMQStream | |
import multiprocessing as mp | |
def sub(n): | |
ctx = zmq.Context() | |
logger = logging.getLogger('sub '+str(n)) | |
# Only subscribe to data directed to this subscriber: | |
sock_data = ctx.socket(zmq.SUB) | |
sock_data.connect("tcp://localhost:5555") | |
sock_data.setsockopt(zmq.SUBSCRIBE, str(n)) | |
sock_sync = ctx.socket(zmq.DEALER) | |
sock_sync.setsockopt(zmq.IDENTITY, str(n)) | |
sock_sync.connect("tcp://localhost:5556") | |
# Wait for dummy data ('sync') used for synchronization: | |
ioloop = IOLoop.instance() | |
stream = ZMQStream(sock_data, ioloop) | |
def handler(msg): | |
data = msg[1].decode() | |
logger.info('received: %s', data) | |
if data == 'sync': | |
stream.flush() | |
ioloop.stop() | |
stream.on_recv(handler) | |
ioloop.start() | |
# Send acknowledgment: | |
sock_sync.send('ack') | |
# Process data until a 'quit' signal is received: | |
def handler(msg): | |
data = msg[1].decode() | |
logger.info('received: %s', data) | |
if data == 'quit': | |
stream.flush() | |
ioloop.stop() | |
stream.on_recv(handler) | |
ioloop.start() | |
def pub(n): | |
ctx = zmq.Context() | |
logger = logging.getLogger('pub ') | |
sock_data = ctx.socket(zmq.PUB) | |
sock_data.bind("tcp://*:5555") | |
sock_sync = ctx.socket(zmq.ROUTER) | |
sock_sync.bind("tcp://*:5556") | |
ack_list = set(range(n)) | |
# Send sync signals periodically; the callback time interval should not be | |
# too small: | |
ioloop = IOLoop.instance() | |
def send(): | |
for i in xrange(n): | |
sock_data.send_multipart([str(i), 'sync']) | |
pc = PeriodicCallback(send, 50, ioloop) | |
pc.start() | |
stream = ZMQStream(sock_sync, ioloop) | |
# Stop sending sync signals when all subscribers acknowledge the | |
# synchronization: | |
def handler(msg): | |
addr = msg[0].decode() | |
data = msg[1].decode() | |
logger.info('received: ack from %s', addr) | |
ack_list.remove(int(addr)) | |
if len(ack_list) == 0: | |
stream.flush() | |
ioloop.stop() | |
stream.on_recv(handler) | |
ioloop.start() | |
# Try sending data to different subscribers: | |
for i in xrange(n): | |
for j in xrange(i*5, (i+1)*5): | |
sock_data.send_multipart([str(i), 'data'+str(j)]) | |
# All done: | |
sock_data.send_multipart([str(i), 'quit']) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s %(name)s %(levelname)s %(message)s') | |
N = 3 | |
for i in xrange(N): | |
mp.Process(target=sub, args=(i,)).start() | |
pub(N) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment