-
-
Save anopheles/3706633 to your computer and use it in GitHub Desktop.
# encoding: utf-8 | |
import zmq | |
from collections import defaultdict | |
context = zmq.Context() | |
client = context.socket(zmq.ROUTER) | |
client.bind("tcp://*:5556") | |
poll = zmq.Poller() | |
poll.register(client, zmq.POLLIN) | |
counter = defaultdict(int) | |
while True: | |
# handle input | |
sockets = dict(poll.poll(1000)) | |
if sockets: | |
identity = client.recv() | |
msg = client.recv() | |
counter[identity] += 1 | |
# start recording | |
for identity in counter.keys(): | |
client.send(identity, zmq.SNDMORE) | |
client.send("START") | |
print counter |
# encoding: utf-8 | |
import random | |
import zmq | |
import time | |
context = zmq.Context() | |
worker = context.socket(zmq.DEALER) | |
worker.setsockopt(zmq.IDENTITY, str(random.randint(0, 8000))) | |
worker.connect("tcp://localhost:5556") | |
start = False | |
worker.send("Hello") | |
while True: | |
if start: | |
worker.send("recording data: %s" % random.randint(0,100)) | |
time.sleep(0.5) | |
request = worker.recv() | |
if request == "START": | |
start = True | |
if request == "STOP": | |
start = False | |
if request == "END": | |
print "A is finishing" | |
break |
I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:
import time
from threading import Thread
import zmq
def worker_thread():
cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")
for _ in range(10):
request = worker.recv()
print 'worker recieved'
worker.send_multipart(['A', "data_recieved"])
cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')
Thread(target=worker_thread).start()
time.sleep(2)
for _ in range(10):
client.send_multipart(['A', 'data'])
request = client.recv()
print 'worker responded'
This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?
Client:
import time
import zmq
cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')
for _ in range(10):
client.send_multipart(['A', 'data'])
request = client.recv()
print 'worker responded'
Worker:
import time
import zmq
cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")
for _ in range(10):
request = worker.recv()
print 'worker recieved'
worker.send_multipart(['A', "data_recieved"])
I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.
I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:
import time from threading import Thread import zmq def worker_thread(): cxt = zmq.Context.instance() worker = cxt.socket(zmq.DEALER) worker.setsockopt(zmq.IDENTITY, 'A') worker.connect("tcp://127.0.0.1:5559") for _ in range(10): request = worker.recv() print 'worker recieved' worker.send_multipart(['A', "data_recieved"]) cxt = zmq.Context.instance() client = cxt.socket(zmq.ROUTER) client.bind('tcp://127.0.0.1:5559') Thread(target=worker_thread).start() time.sleep(2) for _ in range(10): client.send_multipart(['A', 'data']) request = client.recv() print 'worker responded'
This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?
Client:
import time import zmq cxt = zmq.Context.instance() client = cxt.socket(zmq.ROUTER) client.bind('tcp://127.0.0.1:5559') for _ in range(10): client.send_multipart(['A', 'data']) request = client.recv() print 'worker responded'
Worker:
import time import zmq cxt = zmq.Context.instance() worker = cxt.socket(zmq.DEALER) worker.setsockopt(zmq.IDENTITY, 'A') worker.connect("tcp://127.0.0.1:5559") for _ in range(10): request = worker.recv() print 'worker recieved' worker.send_multipart(['A', "data_recieved"])
I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.
I don't believe recv
is implemented in zmq.DEALER
, only recv_multipart
.
A simple and elegant example, thank you...it cleared a lot of misunderstandings I had from working with ZMQ...
The balancing issue was caused by design! The workers need a fair amount of time to initialize. Putting a time.sleep(0.5) after the worker.send() in line 14 in the worker.py the messages arrive evenly.