Created
May 13, 2019 13:56
-
-
Save bradmontgomery/8f1de0e56fa86c29a7daadab1c370c56 to your computer and use it in GitHub Desktop.
Simple example of MQTT publishers / subscriber using paho.mqtt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Spawn a lot of publishers. | |
""" | |
import json | |
import os | |
import random | |
import sys | |
import paho.mqtt.client as mqtt | |
from datetime import datetime | |
from multiprocessing import Process | |
HOST = 'localhost' | |
PORT = 1883 | |
KEEPALIVE = 60 # in seconds | |
# This callback is called when a message that was to be sent using the publish() | |
# call has completed transmission to the broker. | |
def on_publish(client, userdata, mid): | |
print(f"Client: {client.spawned_from_pid} published message: {mid}") | |
def _get_data(pid): | |
ts = datetime.utcnow().strftime("%c") | |
value = random.random() | |
print(f"Publishing [{ts}] {value} from {pid}") | |
return json.dumps({'timestamp': ts, 'value': value, 'pid': pid}) | |
def spawn_client(topic="default"): | |
pid = os.getpid() | |
print(f"Creating client with PID: {pid}") | |
client = mqtt.Client() | |
client.spawned_from_pid = pid | |
client.connect(HOST, PORT, KEEPALIVE) | |
client.on_publish = on_publish | |
client.loop_start() | |
try: | |
while True: | |
client.publish(topic, payload=_get_data(pid), qos=0, retain=False) | |
except KeyboardInterrupt: | |
print(f"Calling loop_stop() for pid: {pid}") | |
client.loop_stop() | |
if __name__ == "__main__": | |
if len(sys.argv) > 2: | |
topic = sys.argv[1] | |
try: | |
n_procs = int(sys.argv[2]) | |
except (ValueError, IndexError): | |
n_procs = 4 | |
procs = [] | |
for _ in range(n_procs): | |
p = Process(target=spawn_client, args=(topic, )) | |
p.start() | |
procs.append(p) | |
# do I need to call .join() if I dont' care about return values? | |
for p in procs: | |
p.join() | |
else: | |
print("USAGE: ./publishers <topic> [n_processes]") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
paho-mqtt==1.4.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
A single-process subscriber. | |
""" | |
import json | |
import sys | |
import paho.mqtt.subscribe as subscribe | |
HOST = 'localhost' | |
PORT = 1883 | |
KEEPALIVE = 60 # in seconds | |
# The callback for when a PUBLISH message is received from the server. | |
def on_message(client, userdata, msg): | |
payload = json.loads(msg.payload.decode('utf8')) | |
ts = payload.get('timestamp') | |
value = payload.get('value') | |
print(f"[{ts}] {value} -- {msg.topic}") | |
if __name__ == "__main__": | |
if len(sys.argv) == 2: | |
topic = sys.argv[1] | |
# for more details, see: | |
# https://github.com/eclipse/paho.mqtt.python#subscribe-1 | |
subscribe.callback( | |
on_message, topic, hostname=HOST, port=PORT, keepalive=KEEPALIVE | |
) | |
else: | |
print("USAGE: ./subscriber.py <topic>") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment