Created
January 31, 2019 15:42
-
-
Save abhijitmamarde/af5859e4b81c0b178fa14815b1cd131f to your computer and use it in GitHub Desktop.
pub-sub using zmq and multiprocess module
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import baker | |
from multiprocessing import Process | |
@baker.command | |
def start(name, topicfilter="10001", port="5566"): | |
print(f"listening pub-server at port:{int(port)}; topic:{topicfilter}") | |
# Socket to talk to server | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) | |
socket.connect("tcp://localhost:%s" % port) | |
# Process 5 updates | |
total_value = 0 | |
update_nbr = 0 | |
while True: | |
string = socket.recv_string() | |
print("recevied:", string) | |
topic, messagedata = string.split() | |
total_value += int(messagedata) | |
update_nbr += 1 | |
print(topic, messagedata) | |
print(f"{name} avg value for topic {topicfilter} was {total_value / update_nbr}") | |
@baker.command | |
def run_clients(client_names, topicfilter="10001", port="5566"): | |
client_names = client_names.split(",") | |
for i, name in enumerate(client_names): | |
Process(target=start, args=(name, str(int(topicfilter)+i), port)).start() | |
if __name__ == "__main__": | |
baker.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import random | |
import time | |
import baker | |
@baker.command | |
def start(port="5566"): | |
print(f"running pub-server on: {int(port)}") | |
context = zmq.Context() | |
socket = context.socket(zmq.PUB) | |
socket.bind("tcp://*:%s" % port) | |
while True: | |
topic = random.randrange(9999, 10005) | |
messagedata = random.randrange(1, 215) - 80 | |
print("%d %d" % (topic, messagedata)) | |
socket.send_string("%d %d" % (topic, messagedata)) | |
time.sleep(1) | |
if __name__ == "__main__": | |
baker.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment