Skip to content

Instantly share code, notes, and snippets.

@lebedov
Created June 11, 2012 20:22
Show Gist options
  • Save lebedov/2912423 to your computer and use it in GitHub Desktop.
Save lebedov/2912423 to your computer and use it in GitHub Desktop.
Use eventloops to send and receive data
#!/usr/bin/env python
"""
Use eventloops to send and receive data.
"""
import logging
import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream
import multiprocessing as mp
def sub(n):
ctx = zmq.Context()
logger = logging.getLogger('sub '+str(n))
# Only subscribe to data directed to this subscriber:
sock_data = ctx.socket(zmq.SUB)
sock_data.connect("tcp://localhost:5555")
sock_data.setsockopt(zmq.SUBSCRIBE, str(n))
sock_sync = ctx.socket(zmq.DEALER)
sock_sync.setsockopt(zmq.IDENTITY, str(n))
sock_sync.connect("tcp://localhost:5556")
# Wait for dummy data ('sync') used for synchronization:
ioloop = IOLoop.instance()
stream = ZMQStream(sock_data, ioloop)
def handler(msg):
data = msg[1].decode()
logger.info('received: %s', data)
if data == 'sync':
stream.flush()
ioloop.stop()
stream.on_recv(handler)
ioloop.start()
# Send acknowledgment:
sock_sync.send('ack')
# Process data until a 'quit' signal is received:
def handler(msg):
data = msg[1].decode()
logger.info('received: %s', data)
if data == 'quit':
stream.flush()
ioloop.stop()
stream.on_recv(handler)
ioloop.start()
def pub(n):
ctx = zmq.Context()
logger = logging.getLogger('pub ')
sock_data = ctx.socket(zmq.PUB)
sock_data.bind("tcp://*:5555")
sock_sync = ctx.socket(zmq.ROUTER)
sock_sync.bind("tcp://*:5556")
ack_list = set(range(n))
# Send sync signals periodically; the callback time interval should not be
# too small:
ioloop = IOLoop.instance()
def send():
for i in xrange(n):
sock_data.send_multipart([str(i), 'sync'])
pc = PeriodicCallback(send, 50, ioloop)
pc.start()
stream = ZMQStream(sock_sync, ioloop)
# Stop sending sync signals when all subscribers acknowledge the
# synchronization:
def handler(msg):
addr = msg[0].decode()
data = msg[1].decode()
logger.info('received: ack from %s', addr)
ack_list.remove(int(addr))
if len(ack_list) == 0:
stream.flush()
ioloop.stop()
stream.on_recv(handler)
ioloop.start()
# Try sending data to different subscribers:
for i in xrange(n):
for j in xrange(i*5, (i+1)*5):
sock_data.send_multipart([str(i), 'data'+str(j)])
# All done:
sock_data.send_multipart([str(i), 'quit'])
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s %(levelname)s %(message)s')
N = 3
for i in xrange(N):
mp.Process(target=sub, args=(i,)).start()
pub(N)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment