Skip to content

Instantly share code, notes, and snippets.

@thcipriani
Created May 21, 2021 05:23
Show Gist options
  • Save thcipriani/aebc1e6786f7a8e97cfdf145614a04a7 to your computer and use it in GitHub Desktop.
Save thcipriani/aebc1e6786f7a8e97cfdf145614a04a7 to your computer and use it in GitHub Desktop.
RTL SDR settings for my acurite sensors
[Unit]
Description=rtl_433 to mqtt
[Service]
ExecStart=/home/pi/rtl_433_mqtt_relay.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
[Unit]
Description=rtl_433 to syslog
[Service]
ExecStart=/usr/local/bin/rtl_433 -F syslog:127.0.0.1:1433
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
[Unit]
Description=restart rtl_443
[Service]
Type=oneshot
ExecStart=/bin/systemctl --user restart rtl-sdr-syslog
[Unit]
Description=Restart rtl-sdr syslog
[Timer]
OnCalendar=00/4:00
[Install]
WantedBy=timer.target
#!/usr/bin/env python3
"""MQTT monitoring relay for rtl_433 communication."""
# Needs Paho-MQTT https://pypi.python.org/pypi/paho-mqtt
# Option: PEP 3143 - Standard daemon process library
# (use Python 3.x or pip install python-daemon)
# import daemon
from __future__ import print_function
from __future__ import with_statement
import socket
import json
import paho.mqtt.client as mqtt
UDP_IP = "127.0.0.1"
UDP_PORT = 1433
MQTT_HOST = "homeassistant.local"
MQTT_PORT = 1883
MQTT_PREFIX = "sensor/rtl_433"
MQTT_USER = 'mqtt'
MQTT_PASS = 'DUMMYMQTTPASSWORD'
def mqtt_connect(client, userdata, flags, rc):
"""Log MQTT connects."""
print("MQTT connected: " + mqtt.connack_string(rc))
def mqtt_disconnect(client, userdata, rc):
"""Log MQTT disconnects."""
print("MQTT disconnected: " + mqtt.connack_string(rc))
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.bind((UDP_IP, UDP_PORT))
def id_to_room(id_num):
if id_num == 1574:
return 'tyler_office'
if id_num == 12071:
return 'bedroom'
if id_num == 4663:
return 'blazey_office'
else:
return None
def sanitize(text):
"""Sanitize a name for Graphite/MQTT use."""
return (text
.replace(" ", "_")
.replace("/", "_")
.replace(".", "_")
.replace("&", ""))
def publish_sensor_to_mqtt(mqttc, data, line):
"""Publish rtl_433 sensor data to MQTT."""
path = MQTT_PREFIX
if not "model" in data:
return
if 'acurite-tower' != sanitize(data["model"]).lower():
return
if not 'id' in data:
return
room = id_to_room(data['id'])
if room is None:
print('{}: Unknown id'.format(data['id']))
return
path += "/" + room
if "battery" in data:
if data["battery"] == "OK":
pass
else:
print(path + "/battery" + str(data["battery"]))
mqttc.publish(path + "/battery", str(data["battery"]))
if "humidity" in data:
print(path + "/humidity" + str(data["humidity"]))
mqttc.publish(path + "/humidity", data["humidity"])
if "temperature_C" in data:
print(path + "/temperature" + str(data["temperature_C"]))
mqttc.publish(path + "/temperature", data["temperature_C"])
if "depth_cm" in data:
print(path + "/depth" + str(data["depth_cm"]))
mqttc.publish(path + "/depth", data["depth_cm"])
print(path + line)
mqttc.publish(path, line)
def parse_syslog(line):
"""Try to extract the payload from a syslog line."""
line = line.decode("ascii") # also UTF-8 if BOM
if line.startswith("<"):
# fields should be "<PRI>VER", timestamp, hostname, command, pid, mid, sdata, payload
fields = line.split(None, 7)
line = fields[-1]
return line
def rtl_433_probe():
"""Run a rtl_433 UDP listener."""
mqttc = mqtt.Client()
mqttc.username_pw_set(MQTT_USER, MQTT_PASS)
mqttc.on_connect = mqtt_connect
mqttc.on_disconnect = mqtt_disconnect
mqttc.connect_async(MQTT_HOST, MQTT_PORT, 60)
mqttc.loop_start()
while True:
line, addr = sock.recvfrom(1024)
try:
line = parse_syslog(line)
data = json.loads(line)
publish_sensor_to_mqtt(mqttc, data, line)
except ValueError:
pass
def run():
"""Run main or daemon."""
# with daemon.DaemonContext(files_preserve=[sock]):
# detach_process=True
# uid
# gid
# working_directory
rtl_433_probe()
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment