-
-
Save hetsch/2859464 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple task farm, with routed replies in pyzmq | |
For http://stackoverflow.com/questions/7809200/implementing-task-farm-messaging-pattern-with-zeromq | |
Note that things are run in threads to keep stuff in one file, there is no | |
reason they need to be. | |
License: Public Domain | |
""" | |
import os | |
import time | |
import random | |
import zmq | |
import threading | |
ctx = zmq.Context.instance() | |
client_iface = "tcp://127.0.0.1:5555" | |
engine_iface = "tcp://127.0.0.1:5556" | |
def scheduler(): | |
"""ROUTER-DEALER queue device, for load-balancing requests from clients | |
across engines, and routing replies to the originating client.""" | |
router = ctx.socket(zmq.ROUTER) | |
router.bind(client_iface) | |
dealer = ctx.socket(zmq.DEALER) | |
# this is optional, it just makes identities more obvious when they appear | |
dealer.setsockopt(zmq.IDENTITY, b'Controller.DEALER') | |
dealer.bind(engine_iface) | |
# the remainder of this function can be entirely replaced with | |
# zmq.device(zmq.QUEUE, router, dealer) | |
# but this shows what is really going on: | |
poller = zmq.Poller() | |
poller.register(router, zmq.POLLIN) | |
poller.register(dealer, zmq.POLLIN) | |
while True: | |
evts = dict(poller.poll()) | |
# poll() returns a list of tuples [(socket, evt), (socket, evt)] | |
# dict(poll()) turns this into {socket:evt, socket:evt} | |
if router in evts: | |
msg = router.recv_multipart() | |
# ROUTER sockets prepend the identity of the sender, for routing replies | |
client = msg[0] | |
print "Controller.ROUTER received %s, relaying via DEALER" % msg | |
dealer.send_multipart(msg) | |
if dealer in evts: | |
msg = dealer.recv_multipart() | |
client = msg[0] | |
print "Controller.DEALER received %s, relaying via ROUTER" % msg | |
router.send_multipart(msg) | |
def process_request(msg): | |
"""process the message (reverse letters)""" | |
return [ part[::-1] for part in msg ] | |
def engine(id): | |
"""The engine - receives messages, performs some action, and sends a reply, | |
preserving the leading two message parts as routing identities | |
""" | |
s = ctx.socket(zmq.ROUTER) | |
s.connect(engine_iface) | |
while True: | |
msg = s.recv_multipart() | |
print "engine %s recvd message:" % id, msg | |
# note that the first two parts will be [b'Controller.ROUTER', b'Client.<id>'] | |
# these are needed for the reply to propagate up to the right client | |
idents, request = msg[:2], msg[2:] | |
reply = idents + process_request(request) | |
print "engine %s sending reply:" % id, reply | |
s.send_multipart(reply) | |
def client(id, n): | |
"""The client - sends messages, and receives replies after they | |
have been processed by the """ | |
s = ctx.socket(zmq.DEALER) | |
s.identity = "Client.%s" % id | |
s.connect(client_iface) | |
for i in range(n): | |
msg = ["hello", "world", str(random.randint(10,100))] | |
print "client %s sending :" % id, msg | |
s.send_multipart(msg) | |
msg = s.recv_multipart() | |
print "client %s received:" % id, msg | |
time.sleep(0.1) | |
if __name__ == '__main__': | |
st = threading.Thread(target=scheduler) | |
st.daemon=True | |
st.start() | |
engines = [] | |
for i in range(4): | |
t = threading.Thread(target=engine, args=(i,)) | |
t.daemon=True | |
t.start() | |
engines.append(t) | |
# now start a few clients, and fire off some requests | |
clients = [] | |
for i in range(3): | |
t = threading.Thread(target=client, args=(i,12)) | |
t.start() | |
# remove this t.join() to allow clients to be run concurrently. | |
# this will work just fine, but the print-statements will | |
# be less easy to follow | |
t.join() | |
clients.append(t) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment