Last active
May 2, 2022 12:21
-
-
Save amitkeret/7f303957c8fd1bc14751599974a6a4fa to your computer and use it in GitHub Desktop.
simplified RPi system sensors MQTT announcements (incl. Home Assistant auto-discovery)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# Simplified RPi system sensors MQTT announcements | |
# Inspired by the great work done by @Sennevds | |
# | |
# This script is meant to be executed via crontab. | |
# It does NOT maintain a persistent connection and/or employ a loop function to keep sending messages at defined intervals. | |
# Therefor, in order to make use of the LWT function, added the keepAlive flag to the connecion. | |
# | |
# Make sure to make your keepAlive time slightly longer than the crontab interval time! | |
# MQTT connection info | |
mqttuser = "****" | |
mqttpwd = "****" | |
mqtthost = "****" | |
mqttport = 1883 | |
mqttkeepalive = 60 | |
# RPi-related info | |
piname = "****" | |
disk_path = "/" | |
# | |
# (Hopefully) no need to edit past this line ------------------------------------------- | |
# | |
import sys, os, re, subprocess, socket, json, time, datetime | |
import paho.mqtt.client as paho | |
tot_m, used_m, free_m = map(int, os.popen('free -t -m').readlines()[-1].split()[1:]) | |
CPU_Pct = str(round(float(os.popen('''grep 'cpu ' /proc/stat | awk '{usage=($2+$4)*100/($2+$4+$5)} END {print usage }' ''').readline()),2)) | |
# CPU usage | |
cpu = "%.1f" % float(CPU_Pct) | |
# Memory usage | |
memory = "%.1f" % ( float(tot_m - used_m) / tot_m * 100 ) | |
# Temperature | |
treading = subprocess.check_output(['vcgencmd', 'measure_temp']).decode('UTF-8') | |
temp = str(re.findall('\d+.\d+', treading)[0]) | |
# Clock speed | |
creading = subprocess.check_output(['vcgencmd', 'measure_clock','arm']).decode('UTF-8') | |
clock = str(int(re.findall('\d+', creading)[1]) / 1000000) | |
# Hostname | |
host = socket.gethostname() | |
# Last message (UTC, ISO) | |
lastmsg = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') | |
# @see https://stackoverflow.com/questions/54458308 | |
def disk_usage(path): | |
st = os.statvfs(path) | |
free = st.f_bavail * st.f_frsize | |
total = st.f_blocks * st.f_frsize | |
used = (st.f_blocks - st.f_bfree) * st.f_frsize | |
return "%.1f" % ( float(used) / total * 100 ) | |
# @see https://github.com/Sennevds/system_sensors/blob/master/src/sensors.py#L164 | |
def get_host_ip(): | |
try: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.connect(('8.8.8.8', 80)) | |
return sock.getsockname()[0] | |
except socket.error: | |
try: | |
return socket.gethostbyname(socket.gethostname()) | |
except socket.gaierror: | |
return '127.0.0.1' | |
finally: | |
sock.close() | |
pinamelc = piname.lower() | |
topic = "system-sensors/sensor/%s" % (pinamelc) | |
sensors = { | |
"temperature": temp, | |
"clock_speed": clock, | |
"memory_use": memory, | |
"cpu_usage": cpu, | |
"disk_use": disk_usage(disk_path), | |
"hostname": host, | |
"last_message": lastmsg, | |
"ip": get_host_ip() | |
} | |
ha_sensors = { | |
"temperature": { | |
"name": "Temperature", | |
"unit": "°C", | |
"icon": "thermometer", | |
"class": "temperature"}, | |
"clock_speed": { | |
"name": "Clock speed", | |
"unit": "Hz", | |
"icon": "speedometer"}, | |
"memory_use": { | |
"name": "Memory usage", | |
"unit": "%", | |
"icon": "memory"}, | |
"cpu_usage": { | |
"name": "CPU usage", | |
"unit": "%", | |
"icon": "cpu-64-bit"}, | |
"disk_use": { | |
"name": "Disk usage", | |
"unit": "%", | |
"icon": "micro-sd"}, | |
"hostname": { | |
"name": "Hostname", | |
"icon": "card-account-details"}, | |
"last_message": { | |
"name": "Last message", | |
"icon": "clock-check", | |
"class": "timestamp"}, | |
"ip": { | |
"name": "Host IP", | |
"icon": "lan"} | |
} | |
def add_device_info(config): | |
config['device'] = { | |
"identifiers": ["%s_sensor" % (pinamelc)], | |
"name": "%s Sensors" % (piname), | |
"model": "RPI %s" % (piname), | |
"manufacturer": "RPI"} | |
return config | |
def add_availability_topic(config): | |
config["availability_topic"] = "%s/availability" % (topic) | |
config["payload_available"] = "online" | |
config["payload_not_available"] = "offline" | |
return config | |
client = paho.Client() | |
client.username_pw_set(mqttuser, mqttpwd) | |
client.connect(mqtthost, mqttport, mqttkeepalive) | |
client.will_set("%s/availability" % (topic), "offline", 1, True) | |
client.publish("%s/state" % (topic), json.dumps(sensors), 1, False) | |
for index, sensor in ha_sensors.items(): | |
config = { | |
"name": "%s %s" % (piname, sensor['name']), | |
"state_topic": "%s/state" % (topic), | |
"value_template": "{{ value_json.%s }}" % (index), | |
"unique_id": "%s_sensor_%s" % (pinamelc, index), | |
"icon": "mdi:%s" % (sensor['icon']) | |
} | |
if "unit" in sensor: | |
config['unit_of_measurement'] = sensor['unit'] | |
if "class" in sensor: | |
config['device_class'] = sensor['class'] | |
config = json.dumps(add_device_info(add_availability_topic(config))) | |
client.publish("homeassistant/sensor/%s/%s/config" % (pinamelc, index), config, 1, True) | |
client.publish("%s/availability" % (topic), "online", 1, True) | |
client.publish("homeassistant/binary_sensor/%s/connected/config" % (pinamelc), json.dumps(add_device_info({ | |
"name": "%s Connected" % (sensors['hostname']), | |
"state_topic": "%s/availability" % (topic), | |
"value_template": "{{ value }}", | |
"unique_id": "%s_sensor_connected" % (pinamelc), | |
"payload_on": "online", | |
"payload_off": "offline", | |
"device_class": "connectivity" | |
})), 1, True) | |
time.sleep(1) | |
client.disconnect() | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment