Last active
July 10, 2023 02:54
-
-
Save memogarcia/06be90e61a7a39686d54bfa26b8d261a to your computer and use it in GitHub Desktop.
Python ZMQ Push Pull Pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zmq | |
import pprint | |
def result_collector(): | |
context = zmq.Context() | |
results_receiver = context.socket(zmq.PULL) | |
results_receiver.bind("tcp://127.0.0.1:5558") | |
collecter_data = {} | |
for x in xrange(1000): | |
result = results_receiver.recv_json() | |
if collecter_data.has_key(result['consumer']): | |
collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1 | |
else: | |
collecter_data[result['consumer']] = 1 | |
if x == 999: | |
pprint.pprint(collecter_data) | |
result_collector() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zmq | |
import random | |
def consumer(): | |
consumer_id = random.randrange(1,10005) | |
print "I am consumer #%s" % (consumer_id) | |
context = zmq.Context() | |
# recieve work | |
consumer_receiver = context.socket(zmq.PULL) | |
consumer_receiver.connect("tcp://127.0.0.1:5557") | |
# send work | |
consumer_sender = context.socket(zmq.PUSH) | |
consumer_sender.connect("tcp://127.0.0.1:5558") | |
while True: | |
work = consumer_receiver.recv_json() | |
data = work['num'] | |
result = { 'consumer' : consumer_id, 'num' : data} | |
if data%2 == 0: | |
consumer_sender.send_json(result) | |
consumer() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import zmq | |
def producer(): | |
context = zmq.Context() | |
zmq_socket = context.socket(zmq.PUSH) | |
zmq_socket.bind("tcp://127.0.0.1:5557") | |
# Start your result manager and workers before you start your producers | |
for num in xrange(20000): | |
work_message = { 'num' : num } | |
zmq_socket.send_json(work_message) | |
producer() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Push/Pull pattern
Push and Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. A Push socket will distribute sent messages to its Pull clients evenly. This is equivalent to producer/consumer model but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.
Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes.