Created
April 20, 2017 16:08
-
-
Save jasonmhite/5adb190bd04a1f0065172b5e8504a961 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import paho.mqtt.client as mqtt | |
from influxdb import InfluxDBClient | |
from math import isnan | |
MQTT_ADDR = 'raspi' | |
TOPIC = "/sensors/#" | |
DB_SRV = { | |
"host": "###", | |
"username": "###", | |
"password": "###", | |
"database": "sensors", | |
} | |
icli = InfluxDBClient(**DB_SRV) | |
####### Handlers for each sensor class | |
def nan_wrapper(f): | |
def wrapped_f(sid, data, cal): | |
payload = f(sid, data, cal) | |
for item in payload: | |
if isnan(item["fields"]["value"]): | |
print("Invalid value {} from {}".format(item["fields"]["value"], sid)) | |
return None | |
return payload | |
return wrapped_f | |
def handle_debug_error(sid, data, cal): | |
payload = [{ | |
"measurement": "crash", | |
"tags": {"id": sid}, | |
"fields": {"value": 1}, | |
}] | |
return payload | |
@nan_wrapper | |
def handle_ds18b20(sid, data, cal): | |
temp = float(data) | |
if temp == 85.0 or temp == -127.0: | |
return | |
payload = [{ | |
"measurement": "temperature", | |
"tags": {"id": sid}, | |
"fields": {"value": temp + cal}, | |
}] | |
return payload | |
@nan_wrapper | |
def handle_dht22(sid, data, cal): | |
error_code, temp, humid = data.split() | |
payload = [ | |
{ | |
"measurement": "temperature", | |
"tags": {"id": sid, "err": error_code}, | |
"fields": {"value": float(temp) + cal[0]}, | |
}, | |
{ | |
"measurement": "humidity", | |
"tags": {"id": sid, "err": error_code + cal[1]}, | |
"fields": {"value": float(humid)}, | |
} | |
] | |
return payload | |
@nan_wrapper | |
def handle_dht22_bare(sid, data, cal): | |
temp, humid = data.split() | |
payload = [ | |
{ | |
"measurement": "temperature", | |
"tags": {"id": sid}, | |
"fields": {"value": float(temp) + cal[0]}, | |
}, | |
{ | |
"measurement": "humidity", | |
"tags": {"id": sid}, | |
"fields": {"value": float(humid) + cal[1]}, | |
} | |
] | |
return payload | |
@nan_wrapper | |
def handle_light(sid, data, cal): | |
light = float(data) | |
payload = [ | |
{ | |
"measurement": "light", | |
"tags": {"id": sid}, | |
"fields": {"value": light + cal} | |
} | |
] | |
return payload | |
@nan_wrapper | |
def handle_ds18_bmp(sid, data, cal): | |
temp_ds18, temp_bmp = data.split() | |
payload = [ | |
{ | |
"measurement": "temperature", | |
"tags": {"id": sid, "instrument": "ds18"}, | |
"fields": {"value": float(temp_ds18) + cal[0]}, | |
}, | |
{ | |
"measurement": "temperature", | |
"tags": {"id": sid, "instrument": "bmp"}, | |
"fields": {"value": float(temp_bmp) + cal[1]}, | |
}, | |
] | |
return payload | |
@nan_wrapper | |
def handle_pressure(sid, data, cal): | |
payload = [ | |
{ | |
"measurement": "pressure", | |
"tags": {"id": sid}, | |
"fields": {"value": float(data) + cal} | |
} | |
] | |
return payload | |
@nan_wrapper | |
def handle_humidity(sid, data, cal): | |
payload = [ | |
{ | |
"measurement": "humidity", | |
"tags": {"id": sid}, | |
"fields": {"value": float(data) + cal} | |
} | |
] | |
return payload | |
@nan_wrapper | |
def handle_battery(sid, data, cal): | |
payload = [ | |
{ | |
"measurement": "battery", | |
"tags": {"id": sid}, | |
"fields": {"value": float(data) + cal} | |
} | |
] | |
return payload | |
# TODO: Make this external? | |
# Will have to do lookups on the handlers | |
def handle_motion(sid, data, cal): | |
payload = [ | |
{ | |
"measurement": "motion", | |
"tags": {"id": sid}, | |
"fields": {"value": float(data)} | |
} | |
] | |
return payload | |
handlers = { | |
"crash": { | |
"name": "therm0", | |
"measurements": { | |
"therm0": { | |
"handler": handle_debug_error, | |
"calibration": None, | |
}, | |
}, | |
}, | |
"controllers/therm0": { | |
"name": "meta_temp", | |
"measurements": { | |
"control/temp": { | |
"handler": handle_ds18b20, | |
"calibration": 0.0, | |
}, | |
}, | |
}, | |
"sensors/therm0": { | |
"name": "therm0", | |
"measurements": { | |
"temp": { | |
# "handler": handle_dht22_bare, | |
"handler": handle_ds18b20, | |
# "calibration": [-0.15, 0], | |
"calibration": 0.0, | |
}, | |
}, | |
}, | |
"sensors/temp1": { | |
"name": "temp1", | |
"measurements": { | |
"temp": { | |
"handler": handle_ds18b20, | |
"calibration": 0.65, | |
}, | |
}, | |
}, | |
"sensors/temp2": { | |
"name": "temp2", | |
"measurements": { | |
"temp": { | |
"handler": handle_dht22_bare, | |
"calibration": [-0.10, 0], | |
}, | |
}, | |
}, | |
"sensors/temp3": { | |
"name": "temp3", | |
"measurements": { | |
"temp": { | |
"handler": handle_ds18b20, | |
"calibration": 0, | |
}, | |
"light": { | |
"handler": handle_light, | |
"calibration": 0, | |
}, | |
}, | |
}, | |
"sensors/temp4": { | |
"name": "temp4", | |
"measurements": { | |
"temp": { | |
"handler": handle_ds18b20, | |
"calibration": 0, | |
}, | |
"pressure": { | |
"handler": handle_pressure, | |
"calibration": 0, | |
}, | |
"humidity": { | |
"handler": handle_humidity, | |
"calibration": 0, | |
}, | |
"battery": { | |
"handler": handle_battery, | |
"calibration": 0, | |
}, | |
}, | |
}, | |
"sensors/temp5": { | |
"name": "temp5", | |
"measurements": { | |
"temp": { | |
"handler": handle_ds18b20, | |
"calibration": -1.2 | |
}, | |
}, | |
}, | |
"sensors/temp6": { | |
"name": "temp6", | |
"measurements": { | |
"temp": { | |
"handler": handle_ds18b20, | |
"calibration": -1.2 | |
}, | |
}, | |
}, | |
"sensors/light1": { | |
"name": "light1", | |
"measurements": { | |
"light": { | |
"handler": handle_light, | |
"calibration": 0.0 | |
} | |
} | |
}, | |
"sensors/temp7": { | |
"name": "reference", | |
"measurements": { | |
"temp": { | |
"handler": handle_dht22_bare, | |
"calibration": [0.0, 0.0] | |
}, | |
} | |
}, | |
"sensors/motion0": { | |
"name": "motion0", | |
"measurements": { | |
"motion": { | |
"handler": handle_motion, | |
"calibration": 0.0 | |
} | |
} | |
} | |
} | |
def make_dispatch_table(H): | |
D = {} | |
for base, v in H.items(): | |
for measurement, handler in v["measurements"].items(): | |
handler = { | |
"name": v["name"], | |
"measurement": measurement, | |
"handler": handler["handler"], | |
"calibration": handler["calibration"], | |
} | |
D["/".join([base, measurement])] = handler | |
return D | |
dispatch_table = make_dispatch_table(handlers) | |
def dispatch(msg): | |
topic = str(msg.topic) | |
handler = dispatch_table[topic] | |
data = msg.payload | |
payload = handler["handler"]( | |
handler["name"], | |
data, | |
handler["calibration"], | |
) | |
return payload | |
def on_connect(client, userdata, flags, rc): | |
print("Connected to MQTT broker") | |
# client.subscribe(TOPIC) | |
print("Subscribing:") | |
for top, v in handlers.items(): | |
for meas in v["measurements"]: | |
print(" -> " + "/".join([top, meas])) | |
client.subscribe("/".join([top, meas])) | |
print("") | |
def on_message(client, userdata, msg): | |
print(str(msg.topic), str(msg.payload)) | |
# dispatch(msg) | |
try: | |
payload = dispatch(msg) | |
if payload is not None: | |
icli.write_points(payload) | |
print(payload) | |
except Exception as e: | |
print(" Parse Failure: {}".format(e)) | |
client = mqtt.Client() | |
client.on_connect = on_connect | |
client.on_message = on_message | |
client.connect(MQTT_ADDR, 1883, 60) | |
client.loop_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment