Created
June 24, 2019 17:43
-
-
Save jayrbolton/438c840f5855b9ddaf09d3a5687e1eb1 to your computer and use it in GitHub Desktop.
Example of push pull workers with zmq thread queue vs builtin thread queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zmq | |
import zmq.devices | |
from threading import Thread | |
# Advantages: | |
# - Easier to change the architecture around | |
# - Easier to add other processes, workers, remote workers, etc | |
# Disadvantages: | |
# - Need a little knowledge of zmq | |
# - Need a package | |
def main(): | |
frontend_url = 'inproc://frontend' | |
backend_url = 'inproc://backend' | |
streamer = zmq.devices.ThreadDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH) | |
streamer.bind_in(frontend_url) | |
streamer.bind_out(backend_url) | |
streamer.start() | |
threads = [ | |
Thread(target=pusher, args=(frontend_url, 0), daemon=True), | |
Thread(target=pusher, args=(frontend_url, 1), daemon=True), | |
Thread(target=puller, args=(backend_url, 2), daemon=True), | |
Thread(target=puller, args=(backend_url, 3), daemon=True), | |
] | |
for t in threads: | |
t.start() | |
while True: | |
time.sleep(1) | |
def pusher(sock_url, _id): | |
print(f'Pusher {_id} starting..') | |
sock = zmq.Context.instance().socket(zmq.PUSH) | |
sock.connect(sock_url) | |
while True: | |
sock.send_string('sup nerds') | |
time.sleep(5) | |
def puller(sock_url, _id): | |
print(f'Puller {_id} starting..') | |
sock = zmq.Context.instance().socket(zmq.PULL) | |
sock.connect(sock_url) | |
while True: | |
msg = sock.recv_string() | |
print(f'Puller {_id} received: {msg}') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment