Last active
December 9, 2023 23:43
-
-
Save ecolss/fe0557bfe362b83deb2bbcba0f0f6738 to your computer and use it in GitHub Desktop.
zeromq router-to-router example code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
import random | |
import sys | |
import zmq | |
def worker_proc(worker_addr, id_, proc_factory, proc_kwargs): | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.REQ) | |
sock.identity = f"woker_{id_}".encode() | |
sock.connect(worker_addr) | |
proc = proc_factory(**proc_kwargs) | |
sock.send(b"READY") | |
while True: | |
try: | |
addr, _, *data = sock.recv_multipart() | |
res = proc(data) | |
sock.send_multipart([addr, b"", *res]) | |
except Exception as err: | |
print(f"Exception caught, {err}") | |
sock.send_multipart([addr, b"", f"Exception: {err}".encode()]) | |
def client_proc(client_addr, id_): | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.REQ) | |
sock.connect(client_addr) | |
sock.send(b"HELLO") | |
res = sock.recv() | |
print(f"client {id_} recv: {res}") | |
def run_r2r_poller(worker_addr, client_addr): | |
context = zmq.Context.instance() | |
frontend = context.socket(zmq.ROUTER) | |
frontend.bind(client_addr) | |
backend = context.socket(zmq.ROUTER) | |
backend.bind(worker_addr) | |
workers = [] | |
poller = zmq.Poller() | |
poller.register(backend, zmq.POLLIN) | |
poller.register(frontend, zmq.POLLIN) | |
while True: | |
sockets = dict(poller.poll()) | |
if backend in sockets: | |
worker, _, *data = backend.recv_multipart() | |
workers.append(worker) | |
if data[0] != b"READY": | |
frontend.send_multipart([client, b"", *data]) | |
if frontend in sockets and len(workers): | |
client, _, *data = frontend.recv_multipart() | |
worker = workers.pop(0) | |
backend.send_multipart([worker, b"", client, b"", *data]) | |
def factory(**kwargs): | |
def dummy_fn(x): | |
return [b"DONE"] | |
return dummy_fn | |
def main(mode): | |
worker_addr = "ipc://workers" | |
client_addr = "ipc://clients" | |
if mode == 0: | |
clients = [ | |
mp.Process(target=client_proc, args=(client_addr, i)) | |
for i in range(2) | |
] | |
[el.start() for el in clients] | |
elif mode == 1: | |
workers = [ | |
mp.Process( | |
target=worker_proc, | |
args=(worker_addr, i, factory, {}), | |
) | |
for i in range(2) | |
] | |
[el.start() for el in workers] | |
elif mode == 2: | |
run_r2r_poller(worker_addr, client_addr) | |
if __name__ == "__main__": | |
mode = int(sys.argv[1]) | |
main(mode) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment