Skip to content

Instantly share code, notes, and snippets.

@jayrbolton
Created June 24, 2019 17:43
Show Gist options
  • Save jayrbolton/438c840f5855b9ddaf09d3a5687e1eb1 to your computer and use it in GitHub Desktop.
Save jayrbolton/438c840f5855b9ddaf09d3a5687e1eb1 to your computer and use it in GitHub Desktop.
Example of push pull workers with zmq thread queue vs builtin thread queue
import time
import zmq
import zmq.devices
from threading import Thread
# Advantages:
# - Easier to change the architecture around
# - Easier to add other processes, workers, remote workers, etc
# Disadvantages:
# - Need a little knowledge of zmq
# - Need a package
def main():
frontend_url = 'inproc://frontend'
backend_url = 'inproc://backend'
streamer = zmq.devices.ThreadDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
streamer.bind_in(frontend_url)
streamer.bind_out(backend_url)
streamer.start()
threads = [
Thread(target=pusher, args=(frontend_url, 0), daemon=True),
Thread(target=pusher, args=(frontend_url, 1), daemon=True),
Thread(target=puller, args=(backend_url, 2), daemon=True),
Thread(target=puller, args=(backend_url, 3), daemon=True),
]
for t in threads:
t.start()
while True:
time.sleep(1)
def pusher(sock_url, _id):
print(f'Pusher {_id} starting..')
sock = zmq.Context.instance().socket(zmq.PUSH)
sock.connect(sock_url)
while True:
sock.send_string('sup nerds')
time.sleep(5)
def puller(sock_url, _id):
print(f'Puller {_id} starting..')
sock = zmq.Context.instance().socket(zmq.PULL)
sock.connect(sock_url)
while True:
msg = sock.recv_string()
print(f'Puller {_id} received: {msg}')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment