-
-
Save anopheles/3706633 to your computer and use it in GitHub Desktop.
# encoding: utf-8 | |
import zmq | |
from collections import defaultdict | |
context = zmq.Context() | |
client = context.socket(zmq.ROUTER) | |
client.bind("tcp://*:5556") | |
poll = zmq.Poller() | |
poll.register(client, zmq.POLLIN) | |
counter = defaultdict(int) | |
while True: | |
# handle input | |
sockets = dict(poll.poll(1000)) | |
if sockets: | |
identity = client.recv() | |
msg = client.recv() | |
counter[identity] += 1 | |
# start recording | |
for identity in counter.keys(): | |
client.send(identity, zmq.SNDMORE) | |
client.send("START") | |
print counter |
# encoding: utf-8 | |
import random | |
import zmq | |
import time | |
context = zmq.Context() | |
worker = context.socket(zmq.DEALER) | |
worker.setsockopt(zmq.IDENTITY, str(random.randint(0, 8000))) | |
worker.connect("tcp://localhost:5556") | |
start = False | |
worker.send("Hello") | |
while True: | |
if start: | |
worker.send("recording data: %s" % random.randint(0,100)) | |
time.sleep(0.5) | |
request = worker.recv() | |
if request == "START": | |
start = True | |
if request == "STOP": | |
start = False | |
if request == "END": | |
print "A is finishing" | |
break |
The balancing issue was caused by design! The workers need a fair amount of time to initialize. Putting a time.sleep(0.5) after the worker.send() in line 14 in the worker.py the messages arrive evenly.
I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:
import time
from threading import Thread
import zmq
def worker_thread():
cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")
for _ in range(10):
request = worker.recv()
print 'worker recieved'
worker.send_multipart(['A', "data_recieved"])
cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')
Thread(target=worker_thread).start()
time.sleep(2)
for _ in range(10):
client.send_multipart(['A', 'data'])
request = client.recv()
print 'worker responded'
This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?
Client:
import time
import zmq
cxt = zmq.Context.instance()
client = cxt.socket(zmq.ROUTER)
client.bind('tcp://127.0.0.1:5559')
for _ in range(10):
client.send_multipart(['A', 'data'])
request = client.recv()
print 'worker responded'
Worker:
import time
import zmq
cxt = zmq.Context.instance()
worker = cxt.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, 'A')
worker.connect("tcp://127.0.0.1:5559")
for _ in range(10):
request = worker.recv()
print 'worker recieved'
worker.send_multipart(['A', "data_recieved"])
I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.
I am trying to achieve two way communication using a dealer and a router. I modified your example slightly:
import time from threading import Thread import zmq def worker_thread(): cxt = zmq.Context.instance() worker = cxt.socket(zmq.DEALER) worker.setsockopt(zmq.IDENTITY, 'A') worker.connect("tcp://127.0.0.1:5559") for _ in range(10): request = worker.recv() print 'worker recieved' worker.send_multipart(['A', "data_recieved"]) cxt = zmq.Context.instance() client = cxt.socket(zmq.ROUTER) client.bind('tcp://127.0.0.1:5559') Thread(target=worker_thread).start() time.sleep(2) for _ in range(10): client.send_multipart(['A', 'data']) request = client.recv() print 'worker responded'
This seems to work as expected. There is an exchange of 10 messages. However, when I split the worker and client code into two different script files, as seen below, the client sends a message, and the worker blocks infinitely while waiting to receive it. Why would this happen?
Client:
import time import zmq cxt = zmq.Context.instance() client = cxt.socket(zmq.ROUTER) client.bind('tcp://127.0.0.1:5559') for _ in range(10): client.send_multipart(['A', 'data']) request = client.recv() print 'worker responded'
Worker:
import time import zmq cxt = zmq.Context.instance() worker = cxt.socket(zmq.DEALER) worker.setsockopt(zmq.IDENTITY, 'A') worker.connect("tcp://127.0.0.1:5559") for _ in range(10): request = worker.recv() print 'worker recieved' worker.send_multipart(['A', "data_recieved"])
I am new to ZeroMQ and relatively new to python, so I apologise if this issue is caused by something obvious or mundane.
I don't believe recv
is implemented in zmq.DEALER
, only recv_multipart
.
A simple and elegant example, thank you...it cleared a lot of misunderstandings I had from working with ZMQ...
This example suffers from balancing issues.
The router simply waits for a welcome message from the workers and
replies with a "START" command which initiates the sending of random
data at the worker process.
The router process counts the amount of messages it receives from each
client. As already stated, it seems as if some client messages are
received more frequently. After a few seconds this is returned by the
counter dictionary: {'5906': 1, '3801': 147313, '4712': 24986},
meaning that the router received over 24986 messages from worker with
id 4721.