Skip to content

Instantly share code, notes, and snippets.

@ivanalejandro0
Created January 19, 2016 15:26
Show Gist options
  • Save ivanalejandro0/dd036c70d21349dff950 to your computer and use it in GitHub Desktop.
Save ivanalejandro0/dd036c70d21349dff950 to your computer and use it in GitHub Desktop.
ROUTER-DEALER zmq tests
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DEALER to ROUTER (N->1)
# this can be used for many client to use a central worker
import pprint
import random
import time
import uuid
from threading import Thread
import zmq
def process_request(request):
n = 0
try:
n = int(request[-1])
except:
pass
return bytes(n)
def worker():
context = zmq.Context.instance()
worker = context.socket(zmq.ROUTER)
worker.connect("ipc://routing.ipc")
total = 0
while True:
request = worker.recv_multipart()
print "Worker - request: ", request
client_id, msg_id, msg = request
if random.choice([True, True, False]): # should I respond?
response = process_request(msg)
worker.send_multipart([client_id, msg_id, response])
finished = msg == b"END"
if finished:
break
if msg == b'STATUS':
worker.send_multipart("A received: %s" % total)
total += 1
context = zmq.Context.instance()
client = context.socket(zmq.DEALER)
client.setsockopt(zmq.IDENTITY, b'A')
client.bind("ipc://routing.ipc")
Thread(target=worker).start()
# Wait for threads to stabilize
time.sleep(1)
def dispatch(r):
uid = bytes(uuid.uuid4())
r = client.send_multipart([uid, bytes(r)])
return uid
def receive():
"""
Receive from client, DO NOT BLOCK, if there's nothing to receive just
return None.
"""
response = None
try:
response = client.recv_multipart(zmq.NOBLOCK)
except:
pass
return response
def main():
requests = {}
for i in range(10):
work = b"This is the workload {0}".format(i)
uid = dispatch(work)
requests[uid] = {'work': work}
r = receive()
if r is not None:
print "Received: ", r
ruid = r[0]
requests[ruid]['response'] = r[1]
dispatch(b'END')
time.sleep(1)
print
print "Report:"
print '-'*20
pprint.pprint(requests)
if __name__ == "__main__":
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DEALER to ROUTER (N->1)
# this can be used for many client to use a central worker
import pprint
import random
import time
import uuid
from threading import Thread
import zmq
def process_request(request):
n = 0
try:
n = int(request[-1])
except:
pass
return bytes(n)
def worker():
context = zmq.Context.instance()
worker = context.socket(zmq.ROUTER)
worker.bind("ipc://routing.ipc")
total = 0
while True:
request = worker.recv_multipart()
print "Worker - request: ", request
client_id, msg_id, msg = request
if random.choice([True, True, False]): # should I respond?
response = process_request(msg)
# worker.send_multipart([client_id, msg_id, response])
worker.send_multipart([b'B', msg_id, response])
finished = msg == b"END"
if finished:
break
if msg == b'STATUS':
response = "Worker received: %s" % total
worker.send_multipart([b'B', msg_id, response])
total += 1
context = zmq.Context.instance()
client = context.socket(zmq.DEALER)
client.setsockopt(zmq.IDENTITY, b'A')
client.connect("ipc://routing.ipc")
Thread(target=worker).start()
def dispatch(r):
uid = bytes(uuid.uuid4())
r = client.send_multipart([uid, bytes(r)])
return uid
def receive():
"""
Receive from client, DO NOT BLOCK, if there's nothing to receive just
return None.
"""
response = None
try:
response = client.recv_multipart(zmq.NOBLOCK)
except:
pass
return response
def receiver():
context = zmq.Context.instance()
client = context.socket(zmq.DEALER)
client.setsockopt(zmq.IDENTITY, b'B')
client.connect("ipc://routing.ipc")
client.send_multipart([b'no-uid', b'hello'])
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
POLL_TIMEOUT = 2000
while True:
socks = dict(poll.poll(POLL_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv_multipart(zmq.NOBLOCK)
print "Received:", reply
Thread(target=receiver).start()
time.sleep(1)
def main():
requests = {}
for i in range(10):
work = b"This is the workload {0}".format(i)
uid = dispatch(work)
requests[uid] = {'work': work}
dispatch(b'END')
time.sleep(1)
print
print "Report:"
print '-'*20
pprint.pprint(requests)
if __name__ == "__main__":
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ROUTER to DEALER (1->N)
# this can be used for a client with many workers
import pprint
import time
import random
import uuid
from threading import Thread
import zmq
def process_request(request):
n = 0
try:
n = int(request[-1])
except:
pass
return bytes(n)
def worker():
context = zmq.Context.instance()
worker = context.socket(zmq.DEALER)
worker.setsockopt(zmq.IDENTITY, b'A')
worker.connect("ipc://routing.ipc")
total = 0
while True:
# We receive one part, with the workload
request = worker.recv_multipart()
print "Worker - request: ", request
msg_id, msg = request
if random.choice([True, True, False]): # should I respond?
response = process_request(msg)
worker.send_multipart([msg_id, response])
finished = msg == b"END"
if finished:
break
if msg == b'STATUS':
# print("A received: %s" % total)
worker.send("A received: %s" % total)
total += 1
context = zmq.Context.instance()
client = context.socket(zmq.ROUTER)
client.bind("ipc://routing.ipc")
Thread(target=worker).start()
# Wait for threads to stabilize
time.sleep(1)
def dispatch(r):
to = b'A'
uid = bytes(uuid.uuid4())
r = client.send_multipart([to, uid, bytes(r)])
return uid
def receive():
"""
Receive from client, DO NOT BLOCK, if there's nothing to receive just
return None.
"""
response = None
try:
response = client.recv_multipart(zmq.NOBLOCK)
except:
pass
return response
def main():
requests = {}
for i in range(10):
work = b"This is the workload {0}".format(i)
uid = dispatch(work)
requests[uid] = {'work': work}
r = receive()
if r is not None:
print "Received: ", r
ruid = r[1]
requests[ruid]['response'] = r[2]
dispatch(b'END')
time.sleep(1)
print
print "Report:"
print '-'*20
pprint.pprint(requests)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment