Created
November 30, 2012 02:44
-
-
Save mangrovemike/4173458 to your computer and use it in GitHub Desktop.
Mosquitto to COSM Bridge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| # | |
| # MangroveMike | |
| # | |
| # Read the local MQTT channels and publish to my COSM channels | |
| # | |
| # | |
| # Based upon great code from: | |
| # Andy Piper | |
| # Roger Light | |
| # Kyle Gordon (https://github.com/kylegordon/mqtt-republisher/blob/master/mqtt-republisher.py) | |
| # | |
| import time | |
| # import logging | |
| import mosquitto | |
| ticks = time.time(); | |
| mqttc = mosquitto.Mosquitto("python_sub_local_" + str(ticks), clean_session=True, obj=None) | |
| mqttCosm = mosquitto.Mosquitto("python_pub_cosm_" + str(ticks), clean_session=True, obj=None) | |
| def on_connect(mosq, obj, rc): | |
| if rc ==0: | |
| print("Connect local rc: "+str(rc)) | |
| else: | |
| raise Exception | |
| def on_connect_cosm(mosq, obj, rc): | |
| if rc ==0: | |
| print("Connect COSM rc: "+str(rc)) | |
| else: | |
| print("NOT Connect COSM rc: "+str(rc)) | |
| raise Exception | |
| def on_message(mosq, obj, msg): | |
| print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) | |
| def on_message_minimal(mosq, obj, msg): | |
| print(msg.topic+" "+str(msg.payload)) | |
| def on_message_publish(mosq, obj, msg): | |
| # print(msg.topic+" "+str(msg.payload)) | |
| apikey = "cosm_api_key_goes_here" | |
| datastream = "-1" | |
| # The default stream | |
| cosmStreamRoot = "/v2/feeds/YOURCOSMSTREAM/datastreams/" | |
| if msg.topic == "weather/merewether/channel/ext_temperature" : | |
| print(msg.topic + "publish temp to cosm: " + str(msg.payload)) | |
| datastream = "0" | |
| if msg.topic == "weather/merewether/channel/precipitation_daily" : | |
| print(msg.topic + "publish precipitation_daily to cosm: " + str(msg.payload)) | |
| datastream = "2" | |
| if msg.topic == "weather/merewether/channel/ext_humidity" : | |
| print(msg.topic + "publish humidity to cosm: " + str(msg.payload)) | |
| datastream = "3" | |
| if msg.topic == "weather/merewether/channel/pressure" : | |
| print(msg.topic + "publish pressure to cosm: " + str(msg.payload)) | |
| datastream = "4" | |
| if msg.topic == "weather/merewether/channel/wind_speed" : | |
| print(msg.topic + "publish Wind Speed to cosm: " + str(msg.payload)) | |
| datastream = "10" | |
| if msg.topic == "weather/merewether/channel/wind_direction" : | |
| print(msg.topic + "publish wind direction to cosm: " + str(msg.payload)) | |
| datastream = "11" | |
| if msg.topic == "weather/merewether/channel/wind_gust_5_mins" : | |
| print(msg.topic + "publish wind gust 5 mins to cosm: " + str(msg.payload)) | |
| datastream = "14" | |
| if msg.topic == "weather/merewether/channel/solar_radiation" : | |
| print(msg.topic + "publish solar radiation to cosm: " + str(msg.payload)) | |
| datastream = "21" | |
| if msg.topic == "weather/merewether/channel/uv_index" : | |
| print(msg.topic + "publish UV to cosm: " + str(msg.payload)) | |
| datastream = "20" | |
| # Now publish the data | |
| if datastream <> "-1" : | |
| # mqttCosm.connect("api.cosm.com", 1883, 10) | |
| mqttCosm.publish(apikey + cosmStreamRoot + datastream + ".csv", str(msg.payload)) | |
| # mqttCosm.disconnect() | |
| def on_publish(mosq, obj, mid): | |
| print("mid: "+str(mid)) | |
| def on_publish_cosm(mosq, obj, mid): | |
| print("COSM Publish mid: "+str(mid)) | |
| localtime = time.asctime( time.localtime(time.time()) ) | |
| print "Local current time :", localtime | |
| def on_subscribe(mosq, obj, mid, granted_qos): | |
| print("Subscribed: "+str(mid)+" "+str(granted_qos)) | |
| def on_log(mosq, obj, level, string): | |
| print(string) | |
| def on_disconnect_cosm (mosq, obj, rc): | |
| if rc == 0: | |
| print("Disconnected COSM successfully.") | |
| else: | |
| localtime = time.asctime( time.localtime(time.time()) ) | |
| print "Local current time :", localtime | |
| print("Disconnected COSM unexpectantly. Result code: "+str(rc)) | |
| # reconnect | |
| print "Disconnect local" | |
| mqttc.disconnect() | |
| print "Sleeping for 180 secs" | |
| time.sleep(180) | |
| # cleanup() | |
| print("Re-connect to COSM") | |
| ticks = time.time(); | |
| mqttCosm = mosquitto.Mosquitto("python_pub_cosm_" + str(ticks), clean_session=True, obj=None) | |
| connectAndSetupCOSM() | |
| # time.sleep(45) | |
| print("Re-connect to local") | |
| connectAndSetupLocal() | |
| print("Return to main processing") | |
| localtime = time.asctime( time.localtime(time.time()) ) | |
| print "Local current time before mailloop call :", localtime | |
| mainloop() | |
| def on_disconnect (mosq, obj, rc): | |
| if rc == 0: | |
| print("Disconnected local successfully.") | |
| else: | |
| print("Disconnected local unexpectantly. Result code: "+str(rc)) | |
| # reconnect | |
| time.sleep(5) | |
| # mainProcessing() | |
| def cleanup(): | |
| print "Ending and cleaning up" | |
| mqttCosm.disconnect() | |
| mqttc.disconnect() | |
| def connectAndSetupLocal(): | |
| mqttc.on_message = on_message_publish | |
| mqttc.on_connect = on_connect | |
| mqttc.on_publish = on_publish | |
| mqttc.on_subscribe = on_subscribe | |
| # Uncomment to enable debug messages | |
| mqttc.on_log = on_log | |
| mqttc.connect("localhost", 1883, 60) | |
| # This gets very cool system stats | |
| # mqttc.subscribe("$SYS/#", 0) | |
| mqttc.subscribe("#", 0) | |
| def connectAndSetupCOSM(): | |
| # setup a publish link to cosm | |
| mqttCosm.on_connect = on_connect_cosm | |
| mqttCosm.on_publish = on_publish_cosm | |
| mqttCosm.on_subscribe = on_subscribe | |
| mqttCosm.on_disconnect = on_disconnect_cosm | |
| mqttCosm.on_log = on_log | |
| mqttCosm.connect("api.cosm.com", 1883, 60) | |
| print "Connected to COSM" | |
| def mainloop(): | |
| try: | |
| rc = 0 | |
| rc2 = 0 | |
| while rc == 0 and rc2 == 0: | |
| rc = mqttc.loop() | |
| rc2 = mqttCosm.loop() | |
| # print("rc: "+str(rc)) | |
| # print(str(rc)) | |
| except (KeyboardInterrupt): | |
| print "Keyboard Interrupt received" | |
| cleanup() | |
| raise | |
| except (RuntimeError): | |
| print "The candle that burns twice as bring only burns half as long. " | |
| cleanup() | |
| raise | |
| except: | |
| print "Unexpected error in mainloop(). Result code 1 is "+str(rc)+ ". result code 2 is "+str(rc2) | |
| raise | |
| # Now do the processing | |
| try: | |
| connectAndSetupLocal() | |
| connectAndSetupCOSM() | |
| mainloop() | |
| except (KeyboardInterrupt): | |
| print "Keyboard Interrupt received" | |
| cleanup() | |
| except (RuntimeError): | |
| print "The candle that burns twice as bring only burns half as long. Time to die!" | |
| cleanup() | |
| except: | |
| print "Unexpected error:" | |
| raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment