Created
January 19, 2016 15:26
-
-
Save ivanalejandro0/dd036c70d21349dff950 to your computer and use it in GitHub Desktop.
ROUTER-DEALER zmq tests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# DEALER to ROUTER (N->1) | |
# this can be used for many client to use a central worker | |
import pprint | |
import random | |
import time | |
import uuid | |
from threading import Thread | |
import zmq | |
def process_request(request): | |
n = 0 | |
try: | |
n = int(request[-1]) | |
except: | |
pass | |
return bytes(n) | |
def worker(): | |
context = zmq.Context.instance() | |
worker = context.socket(zmq.ROUTER) | |
worker.connect("ipc://routing.ipc") | |
total = 0 | |
while True: | |
request = worker.recv_multipart() | |
print "Worker - request: ", request | |
client_id, msg_id, msg = request | |
if random.choice([True, True, False]): # should I respond? | |
response = process_request(msg) | |
worker.send_multipart([client_id, msg_id, response]) | |
finished = msg == b"END" | |
if finished: | |
break | |
if msg == b'STATUS': | |
worker.send_multipart("A received: %s" % total) | |
total += 1 | |
context = zmq.Context.instance() | |
client = context.socket(zmq.DEALER) | |
client.setsockopt(zmq.IDENTITY, b'A') | |
client.bind("ipc://routing.ipc") | |
Thread(target=worker).start() | |
# Wait for threads to stabilize | |
time.sleep(1) | |
def dispatch(r): | |
uid = bytes(uuid.uuid4()) | |
r = client.send_multipart([uid, bytes(r)]) | |
return uid | |
def receive(): | |
""" | |
Receive from client, DO NOT BLOCK, if there's nothing to receive just | |
return None. | |
""" | |
response = None | |
try: | |
response = client.recv_multipart(zmq.NOBLOCK) | |
except: | |
pass | |
return response | |
def main(): | |
requests = {} | |
for i in range(10): | |
work = b"This is the workload {0}".format(i) | |
uid = dispatch(work) | |
requests[uid] = {'work': work} | |
r = receive() | |
if r is not None: | |
print "Received: ", r | |
ruid = r[0] | |
requests[ruid]['response'] = r[1] | |
dispatch(b'END') | |
time.sleep(1) | |
print "Report:" | |
print '-'*20 | |
pprint.pprint(requests) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# DEALER to ROUTER (N->1) | |
# this can be used for many client to use a central worker | |
import pprint | |
import random | |
import time | |
import uuid | |
from threading import Thread | |
import zmq | |
def process_request(request): | |
n = 0 | |
try: | |
n = int(request[-1]) | |
except: | |
pass | |
return bytes(n) | |
def worker(): | |
context = zmq.Context.instance() | |
worker = context.socket(zmq.ROUTER) | |
worker.bind("ipc://routing.ipc") | |
total = 0 | |
while True: | |
request = worker.recv_multipart() | |
print "Worker - request: ", request | |
client_id, msg_id, msg = request | |
if random.choice([True, True, False]): # should I respond? | |
response = process_request(msg) | |
# worker.send_multipart([client_id, msg_id, response]) | |
worker.send_multipart([b'B', msg_id, response]) | |
finished = msg == b"END" | |
if finished: | |
break | |
if msg == b'STATUS': | |
response = "Worker received: %s" % total | |
worker.send_multipart([b'B', msg_id, response]) | |
total += 1 | |
context = zmq.Context.instance() | |
client = context.socket(zmq.DEALER) | |
client.setsockopt(zmq.IDENTITY, b'A') | |
client.connect("ipc://routing.ipc") | |
Thread(target=worker).start() | |
def dispatch(r): | |
uid = bytes(uuid.uuid4()) | |
r = client.send_multipart([uid, bytes(r)]) | |
return uid | |
def receive(): | |
""" | |
Receive from client, DO NOT BLOCK, if there's nothing to receive just | |
return None. | |
""" | |
response = None | |
try: | |
response = client.recv_multipart(zmq.NOBLOCK) | |
except: | |
pass | |
return response | |
def receiver(): | |
context = zmq.Context.instance() | |
client = context.socket(zmq.DEALER) | |
client.setsockopt(zmq.IDENTITY, b'B') | |
client.connect("ipc://routing.ipc") | |
client.send_multipart([b'no-uid', b'hello']) | |
poll = zmq.Poller() | |
poll.register(client, zmq.POLLIN) | |
POLL_TIMEOUT = 2000 | |
while True: | |
socks = dict(poll.poll(POLL_TIMEOUT)) | |
if socks.get(client) == zmq.POLLIN: | |
reply = client.recv_multipart(zmq.NOBLOCK) | |
print "Received:", reply | |
Thread(target=receiver).start() | |
time.sleep(1) | |
def main(): | |
requests = {} | |
for i in range(10): | |
work = b"This is the workload {0}".format(i) | |
uid = dispatch(work) | |
requests[uid] = {'work': work} | |
dispatch(b'END') | |
time.sleep(1) | |
print "Report:" | |
print '-'*20 | |
pprint.pprint(requests) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# ROUTER to DEALER (1->N) | |
# this can be used for a client with many workers | |
import pprint | |
import time | |
import random | |
import uuid | |
from threading import Thread | |
import zmq | |
def process_request(request): | |
n = 0 | |
try: | |
n = int(request[-1]) | |
except: | |
pass | |
return bytes(n) | |
def worker(): | |
context = zmq.Context.instance() | |
worker = context.socket(zmq.DEALER) | |
worker.setsockopt(zmq.IDENTITY, b'A') | |
worker.connect("ipc://routing.ipc") | |
total = 0 | |
while True: | |
# We receive one part, with the workload | |
request = worker.recv_multipart() | |
print "Worker - request: ", request | |
msg_id, msg = request | |
if random.choice([True, True, False]): # should I respond? | |
response = process_request(msg) | |
worker.send_multipart([msg_id, response]) | |
finished = msg == b"END" | |
if finished: | |
break | |
if msg == b'STATUS': | |
# print("A received: %s" % total) | |
worker.send("A received: %s" % total) | |
total += 1 | |
context = zmq.Context.instance() | |
client = context.socket(zmq.ROUTER) | |
client.bind("ipc://routing.ipc") | |
Thread(target=worker).start() | |
# Wait for threads to stabilize | |
time.sleep(1) | |
def dispatch(r): | |
to = b'A' | |
uid = bytes(uuid.uuid4()) | |
r = client.send_multipart([to, uid, bytes(r)]) | |
return uid | |
def receive(): | |
""" | |
Receive from client, DO NOT BLOCK, if there's nothing to receive just | |
return None. | |
""" | |
response = None | |
try: | |
response = client.recv_multipart(zmq.NOBLOCK) | |
except: | |
pass | |
return response | |
def main(): | |
requests = {} | |
for i in range(10): | |
work = b"This is the workload {0}".format(i) | |
uid = dispatch(work) | |
requests[uid] = {'work': work} | |
r = receive() | |
if r is not None: | |
print "Received: ", r | |
ruid = r[1] | |
requests[ruid]['response'] = r[2] | |
dispatch(b'END') | |
time.sleep(1) | |
print "Report:" | |
print '-'*20 | |
pprint.pprint(requests) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment