Skip to content

Instantly share code, notes, and snippets.

@jasonmhite
Created April 20, 2017 16:08
Show Gist options
  • Save jasonmhite/5adb190bd04a1f0065172b5e8504a961 to your computer and use it in GitHub Desktop.
Save jasonmhite/5adb190bd04a1f0065172b5e8504a961 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
from math import isnan
MQTT_ADDR = 'raspi'
TOPIC = "/sensors/#"
DB_SRV = {
"host": "###",
"username": "###",
"password": "###",
"database": "sensors",
}
icli = InfluxDBClient(**DB_SRV)
####### Handlers for each sensor class
def nan_wrapper(f):
def wrapped_f(sid, data, cal):
payload = f(sid, data, cal)
for item in payload:
if isnan(item["fields"]["value"]):
print("Invalid value {} from {}".format(item["fields"]["value"], sid))
return None
return payload
return wrapped_f
def handle_debug_error(sid, data, cal):
payload = [{
"measurement": "crash",
"tags": {"id": sid},
"fields": {"value": 1},
}]
return payload
@nan_wrapper
def handle_ds18b20(sid, data, cal):
temp = float(data)
if temp == 85.0 or temp == -127.0:
return
payload = [{
"measurement": "temperature",
"tags": {"id": sid},
"fields": {"value": temp + cal},
}]
return payload
@nan_wrapper
def handle_dht22(sid, data, cal):
error_code, temp, humid = data.split()
payload = [
{
"measurement": "temperature",
"tags": {"id": sid, "err": error_code},
"fields": {"value": float(temp) + cal[0]},
},
{
"measurement": "humidity",
"tags": {"id": sid, "err": error_code + cal[1]},
"fields": {"value": float(humid)},
}
]
return payload
@nan_wrapper
def handle_dht22_bare(sid, data, cal):
temp, humid = data.split()
payload = [
{
"measurement": "temperature",
"tags": {"id": sid},
"fields": {"value": float(temp) + cal[0]},
},
{
"measurement": "humidity",
"tags": {"id": sid},
"fields": {"value": float(humid) + cal[1]},
}
]
return payload
@nan_wrapper
def handle_light(sid, data, cal):
light = float(data)
payload = [
{
"measurement": "light",
"tags": {"id": sid},
"fields": {"value": light + cal}
}
]
return payload
@nan_wrapper
def handle_ds18_bmp(sid, data, cal):
temp_ds18, temp_bmp = data.split()
payload = [
{
"measurement": "temperature",
"tags": {"id": sid, "instrument": "ds18"},
"fields": {"value": float(temp_ds18) + cal[0]},
},
{
"measurement": "temperature",
"tags": {"id": sid, "instrument": "bmp"},
"fields": {"value": float(temp_bmp) + cal[1]},
},
]
return payload
@nan_wrapper
def handle_pressure(sid, data, cal):
payload = [
{
"measurement": "pressure",
"tags": {"id": sid},
"fields": {"value": float(data) + cal}
}
]
return payload
@nan_wrapper
def handle_humidity(sid, data, cal):
payload = [
{
"measurement": "humidity",
"tags": {"id": sid},
"fields": {"value": float(data) + cal}
}
]
return payload
@nan_wrapper
def handle_battery(sid, data, cal):
payload = [
{
"measurement": "battery",
"tags": {"id": sid},
"fields": {"value": float(data) + cal}
}
]
return payload
# TODO: Make this external?
# Will have to do lookups on the handlers
def handle_motion(sid, data, cal):
payload = [
{
"measurement": "motion",
"tags": {"id": sid},
"fields": {"value": float(data)}
}
]
return payload
handlers = {
"crash": {
"name": "therm0",
"measurements": {
"therm0": {
"handler": handle_debug_error,
"calibration": None,
},
},
},
"controllers/therm0": {
"name": "meta_temp",
"measurements": {
"control/temp": {
"handler": handle_ds18b20,
"calibration": 0.0,
},
},
},
"sensors/therm0": {
"name": "therm0",
"measurements": {
"temp": {
# "handler": handle_dht22_bare,
"handler": handle_ds18b20,
# "calibration": [-0.15, 0],
"calibration": 0.0,
},
},
},
"sensors/temp1": {
"name": "temp1",
"measurements": {
"temp": {
"handler": handle_ds18b20,
"calibration": 0.65,
},
},
},
"sensors/temp2": {
"name": "temp2",
"measurements": {
"temp": {
"handler": handle_dht22_bare,
"calibration": [-0.10, 0],
},
},
},
"sensors/temp3": {
"name": "temp3",
"measurements": {
"temp": {
"handler": handle_ds18b20,
"calibration": 0,
},
"light": {
"handler": handle_light,
"calibration": 0,
},
},
},
"sensors/temp4": {
"name": "temp4",
"measurements": {
"temp": {
"handler": handle_ds18b20,
"calibration": 0,
},
"pressure": {
"handler": handle_pressure,
"calibration": 0,
},
"humidity": {
"handler": handle_humidity,
"calibration": 0,
},
"battery": {
"handler": handle_battery,
"calibration": 0,
},
},
},
"sensors/temp5": {
"name": "temp5",
"measurements": {
"temp": {
"handler": handle_ds18b20,
"calibration": -1.2
},
},
},
"sensors/temp6": {
"name": "temp6",
"measurements": {
"temp": {
"handler": handle_ds18b20,
"calibration": -1.2
},
},
},
"sensors/light1": {
"name": "light1",
"measurements": {
"light": {
"handler": handle_light,
"calibration": 0.0
}
}
},
"sensors/temp7": {
"name": "reference",
"measurements": {
"temp": {
"handler": handle_dht22_bare,
"calibration": [0.0, 0.0]
},
}
},
"sensors/motion0": {
"name": "motion0",
"measurements": {
"motion": {
"handler": handle_motion,
"calibration": 0.0
}
}
}
}
def make_dispatch_table(H):
D = {}
for base, v in H.items():
for measurement, handler in v["measurements"].items():
handler = {
"name": v["name"],
"measurement": measurement,
"handler": handler["handler"],
"calibration": handler["calibration"],
}
D["/".join([base, measurement])] = handler
return D
dispatch_table = make_dispatch_table(handlers)
def dispatch(msg):
topic = str(msg.topic)
handler = dispatch_table[topic]
data = msg.payload
payload = handler["handler"](
handler["name"],
data,
handler["calibration"],
)
return payload
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT broker")
# client.subscribe(TOPIC)
print("Subscribing:")
for top, v in handlers.items():
for meas in v["measurements"]:
print(" -> " + "/".join([top, meas]))
client.subscribe("/".join([top, meas]))
print("")
def on_message(client, userdata, msg):
print(str(msg.topic), str(msg.payload))
# dispatch(msg)
try:
payload = dispatch(msg)
if payload is not None:
icli.write_points(payload)
print(payload)
except Exception as e:
print(" Parse Failure: {}".format(e))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_ADDR, 1883, 60)
client.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment