Skip to content

Instantly share code, notes, and snippets.

@mangrovemike
Created November 30, 2012 02:44
Show Gist options
  • Select an option

  • Save mangrovemike/4173458 to your computer and use it in GitHub Desktop.

Select an option

Save mangrovemike/4173458 to your computer and use it in GitHub Desktop.
Mosquitto to COSM Bridge
#!/usr/bin/python
#
# MangroveMike
#
# Read the local MQTT channels and publish to my COSM channels
#
#
# Based upon great code from:
# Andy Piper
# Roger Light
# Kyle Gordon (https://github.com/kylegordon/mqtt-republisher/blob/master/mqtt-republisher.py)
#
import time
# import logging
import mosquitto
ticks = time.time();
mqttc = mosquitto.Mosquitto("python_sub_local_" + str(ticks), clean_session=True, obj=None)
mqttCosm = mosquitto.Mosquitto("python_pub_cosm_" + str(ticks), clean_session=True, obj=None)
def on_connect(mosq, obj, rc):
if rc ==0:
print("Connect local rc: "+str(rc))
else:
raise Exception
def on_connect_cosm(mosq, obj, rc):
if rc ==0:
print("Connect COSM rc: "+str(rc))
else:
print("NOT Connect COSM rc: "+str(rc))
raise Exception
def on_message(mosq, obj, msg):
print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
def on_message_minimal(mosq, obj, msg):
print(msg.topic+" "+str(msg.payload))
def on_message_publish(mosq, obj, msg):
# print(msg.topic+" "+str(msg.payload))
apikey = "cosm_api_key_goes_here"
datastream = "-1"
# The default stream
cosmStreamRoot = "/v2/feeds/YOURCOSMSTREAM/datastreams/"
if msg.topic == "weather/merewether/channel/ext_temperature" :
print(msg.topic + "publish temp to cosm: " + str(msg.payload))
datastream = "0"
if msg.topic == "weather/merewether/channel/precipitation_daily" :
print(msg.topic + "publish precipitation_daily to cosm: " + str(msg.payload))
datastream = "2"
if msg.topic == "weather/merewether/channel/ext_humidity" :
print(msg.topic + "publish humidity to cosm: " + str(msg.payload))
datastream = "3"
if msg.topic == "weather/merewether/channel/pressure" :
print(msg.topic + "publish pressure to cosm: " + str(msg.payload))
datastream = "4"
if msg.topic == "weather/merewether/channel/wind_speed" :
print(msg.topic + "publish Wind Speed to cosm: " + str(msg.payload))
datastream = "10"
if msg.topic == "weather/merewether/channel/wind_direction" :
print(msg.topic + "publish wind direction to cosm: " + str(msg.payload))
datastream = "11"
if msg.topic == "weather/merewether/channel/wind_gust_5_mins" :
print(msg.topic + "publish wind gust 5 mins to cosm: " + str(msg.payload))
datastream = "14"
if msg.topic == "weather/merewether/channel/solar_radiation" :
print(msg.topic + "publish solar radiation to cosm: " + str(msg.payload))
datastream = "21"
if msg.topic == "weather/merewether/channel/uv_index" :
print(msg.topic + "publish UV to cosm: " + str(msg.payload))
datastream = "20"
# Now publish the data
if datastream <> "-1" :
# mqttCosm.connect("api.cosm.com", 1883, 10)
mqttCosm.publish(apikey + cosmStreamRoot + datastream + ".csv", str(msg.payload))
# mqttCosm.disconnect()
def on_publish(mosq, obj, mid):
print("mid: "+str(mid))
def on_publish_cosm(mosq, obj, mid):
print("COSM Publish mid: "+str(mid))
localtime = time.asctime( time.localtime(time.time()) )
print "Local current time :", localtime
def on_subscribe(mosq, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_log(mosq, obj, level, string):
print(string)
def on_disconnect_cosm (mosq, obj, rc):
if rc == 0:
print("Disconnected COSM successfully.")
else:
localtime = time.asctime( time.localtime(time.time()) )
print "Local current time :", localtime
print("Disconnected COSM unexpectantly. Result code: "+str(rc))
# reconnect
print "Disconnect local"
mqttc.disconnect()
print "Sleeping for 180 secs"
time.sleep(180)
# cleanup()
print("Re-connect to COSM")
ticks = time.time();
mqttCosm = mosquitto.Mosquitto("python_pub_cosm_" + str(ticks), clean_session=True, obj=None)
connectAndSetupCOSM()
# time.sleep(45)
print("Re-connect to local")
connectAndSetupLocal()
print("Return to main processing")
localtime = time.asctime( time.localtime(time.time()) )
print "Local current time before mailloop call :", localtime
mainloop()
def on_disconnect (mosq, obj, rc):
if rc == 0:
print("Disconnected local successfully.")
else:
print("Disconnected local unexpectantly. Result code: "+str(rc))
# reconnect
time.sleep(5)
# mainProcessing()
def cleanup():
print "Ending and cleaning up"
mqttCosm.disconnect()
mqttc.disconnect()
def connectAndSetupLocal():
mqttc.on_message = on_message_publish
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
# Uncomment to enable debug messages
mqttc.on_log = on_log
mqttc.connect("localhost", 1883, 60)
# This gets very cool system stats
# mqttc.subscribe("$SYS/#", 0)
mqttc.subscribe("#", 0)
def connectAndSetupCOSM():
# setup a publish link to cosm
mqttCosm.on_connect = on_connect_cosm
mqttCosm.on_publish = on_publish_cosm
mqttCosm.on_subscribe = on_subscribe
mqttCosm.on_disconnect = on_disconnect_cosm
mqttCosm.on_log = on_log
mqttCosm.connect("api.cosm.com", 1883, 60)
print "Connected to COSM"
def mainloop():
try:
rc = 0
rc2 = 0
while rc == 0 and rc2 == 0:
rc = mqttc.loop()
rc2 = mqttCosm.loop()
# print("rc: "+str(rc))
# print(str(rc))
except (KeyboardInterrupt):
print "Keyboard Interrupt received"
cleanup()
raise
except (RuntimeError):
print "The candle that burns twice as bring only burns half as long. "
cleanup()
raise
except:
print "Unexpected error in mainloop(). Result code 1 is "+str(rc)+ ". result code 2 is "+str(rc2)
raise
# Now do the processing
try:
connectAndSetupLocal()
connectAndSetupCOSM()
mainloop()
except (KeyboardInterrupt):
print "Keyboard Interrupt received"
cleanup()
except (RuntimeError):
print "The candle that burns twice as bring only burns half as long. Time to die!"
cleanup()
except:
print "Unexpected error:"
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment