-
-
Save anopheles/3706633 to your computer and use it in GitHub Desktop.
| # encoding: utf-8 | |
| import zmq | |
| from collections import defaultdict | |
| context = zmq.Context() | |
| client = context.socket(zmq.ROUTER) | |
| client.bind("tcp://*:5556") | |
| poll = zmq.Poller() | |
| poll.register(client, zmq.POLLIN) | |
| counter = defaultdict(int) | |
| while True: | |
| # handle input | |
| sockets = dict(poll.poll(1000)) | |
| if sockets: | |
| identity = client.recv() | |
| msg = client.recv() | |
| counter[identity] += 1 | |
| # start recording | |
| for identity in counter.keys(): | |
| client.send(identity, zmq.SNDMORE) | |
| client.send("START") | |
| print counter |
| # encoding: utf-8 | |
| import random | |
| import zmq | |
| import time | |
| context = zmq.Context() | |
| worker = context.socket(zmq.DEALER) | |
| worker.setsockopt(zmq.IDENTITY, str(random.randint(0, 8000))) | |
| worker.connect("tcp://localhost:5556") | |
| start = False | |
| worker.send("Hello") | |
| while True: | |
| if start: | |
| worker.send("recording data: %s" % random.randint(0,100)) | |
| time.sleep(0.5) | |
| request = worker.recv() | |
| if request == "START": | |
| start = True | |
| if request == "STOP": | |
| start = False | |
| if request == "END": | |
| print "A is finishing" | |
| break |
I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:
import time
from threading import Thread
import zmq
def worker_thread():
cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")
for _ in range(10):
request = worker.recv()
print 'worker recieved'
worker.send_multipart(['A', "data_recieved"])
cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')
Thread(target=worker_thread).start()
time.sleep(2)
for _ in range(10):
client.send_multipart(['A', 'data'])
request = client.recv()
print 'worker responded'
This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?
Client:
import time
import zmq
cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')
for _ in range(10):
client.send_multipart(['A', 'data'])
request = client.recv()
print 'worker responded'
Worker:
import time
import zmq
cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")
for _ in range(10):
request = worker.recv()
print 'worker recieved'
worker.send_multipart(['A', "data_recieved"])
I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.
I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:
import time from threading import Thread import zmq def worker_thread(): cxt = zmq.Context.instance() worker = cxt.socket(zmq.DEALER) worker.setsockopt(zmq.IDENTITY, 'A') worker.connect("tcp://127.0.0.1:5559") for _ in range(10): request = worker.recv() print 'worker recieved' worker.send_multipart(['A', "data_recieved"]) cxt = zmq.Context.instance() client = cxt.socket(zmq.ROUTER) client.bind('tcp://127.0.0.1:5559') Thread(target=worker_thread).start() time.sleep(2) for _ in range(10): client.send_multipart(['A', 'data']) request = client.recv() print 'worker responded'This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?
Client:
import time import zmq cxt = zmq.Context.instance() client = cxt.socket(zmq.ROUTER) client.bind('tcp://127.0.0.1:5559') for _ in range(10): client.send_multipart(['A', 'data']) request = client.recv() print 'worker responded'Worker:
import time import zmq cxt = zmq.Context.instance() worker = cxt.socket(zmq.DEALER) worker.setsockopt(zmq.IDENTITY, 'A') worker.connect("tcp://127.0.0.1:5559") for _ in range(10): request = worker.recv() print 'worker recieved' worker.send_multipart(['A', "data_recieved"])I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.
I don't believe recv is implemented in zmq.DEALER, only recv_multipart.
A simple and elegant example, thank you...it cleared a lot of misunderstandings I had from working with ZMQ...
The balancing issue was caused by design! The workers need a fair amount of time to initialize. Putting a time.sleep(0.5) after the worker.send() in line 14 in the worker.py the messages arrive evenly.