Skip to content

Instantly share code, notes, and snippets.

@ralight
Last active December 14, 2015 07:08
Show Gist options
  • Select an option

  • Save ralight/5047905 to your computer and use it in GitHub Desktop.

Select an option

Save ralight/5047905 to your computer and use it in GitHub Desktop.
Retained message test, with QoS=2.
#!/usr/bin/python
import mosquitto
import time
pub_sent = False
def pub_on_connect(mosq, obj, rc):
mosq.publish("/A/B", "message one", 2, True)
mosq.publish("/A/B", "message two", 2, True)
(rc, mid) = mosq.publish("/A/B", "message three", 2, True)
mosq.user_data_set(mid)
def on_publish(mosq, obj, mid):
global pub_sent
print("published: "+str(mid))
if mid == obj:
pub_sent = True
def sub_on_connect(mosq, obj, rc):
mosq.subscribe("/A/B", 2)
def on_message(mosq, obj, msg):
print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
pub = mosquitto.Mosquitto()
pub.on_connect = pub_on_connect
pub.on_publish = on_publish
pub.connect("test.mosquitto.org")
pub.loop_start()
while pub_sent == False:
time.sleep(0.1)
sub = mosquitto.Mosquitto()
sub.on_connect = sub_on_connect
sub.on_message = on_message
sub.connect("test.mosquitto.org")
sub.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment