Created
August 14, 2019 12:49
-
-
Save bgulla/c23c39383b99f4ac28ce8d2987a954fe to your computer and use it in GitHub Desktop.
Reading electric meters using a usb-sdr
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import sys | |
import os | |
import platform | |
import time | |
import subprocess | |
import json | |
import socket | |
import logging | |
from influxdb import InfluxDBClient | |
import os.path | |
import paho.mqtt.publish as publish | |
import paho.mqtt.client as paho | |
RTLAMR_PATH='/usr/bin/rtlamr' | |
ARCH="x86_64" | |
# TODO: this is hacky AF fix it | |
for key in platform.uname(): | |
if 'armv' in key: | |
RTLAMR_PATH = RTLAMR_PATH+"-arm" | |
ARCH=key | |
# TODO: get rid of this | |
HOSTNAME = platform.node() | |
try: | |
HOSTNAME = os.getenv('HOSTNAME', 'housemeter') | |
except: | |
pass | |
def send_mqtt(topic, payload,): | |
try: | |
publish.single(topic, payload=(payload), qos=1, hostname=os.getenv('MQTT_HOST'), port=int(os.getenv('MQTT_PORT')), auth=None) | |
logger.info("[mqtt] " + topic +"->" + str(payload)) | |
except Exception as ex: | |
logger.error("MQTT Publish Failed: " + str(ex) +" "+ topic + " " + str(payload)) | |
def isPortOpen(HOST, PORT): | |
""" | |
:param HOST: | |
:param PORT: | |
:return: | |
""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
result = sock.connect_ex((HOST,int(PORT))) | |
if result == 0: | |
return False | |
else: | |
return True | |
# TODO: make this an argument '--rtlamr=...' | |
if not os.path.exists(RTLAMR_PATH): | |
RTLAMR_PATH='/home/brandon/projects/golang/bin/rtlamr' | |
if not os.path.exists(RTLAMR_PATH): | |
print("rtlamr executable not found") | |
sys.exit(1) | |
# TODO: pass in influxdb vars as method arguments | |
def send_reading_to_influxdb(reading): | |
""" | |
:param reading: | |
:return: | |
""" | |
dbname = os.getenv('INFLUXDB_DATABASE', 'power') | |
user = os.getenv('INFLUXDB_USER', "") | |
password = os.getenv('INFLUXDB_PASSWORD', "") | |
port = os.getenv('INFLUXDB_PORT', 8086) | |
host = os.getenv('INFLUXDB_HOST', '10.0.1.9') | |
client = InfluxDBClient(host, port, user, password, dbname) | |
client.create_database(dbname) | |
# TODO this is hardcoded isnt it | |
json_body = [ | |
{ | |
'measurement': 'power', | |
'fields': { | |
'WATTS' : float(reading), | |
}, | |
'tags': { | |
'host': 'housemeter' | |
} | |
} | |
] | |
client.write_points(json_body) | |
client.close() | |
return | |
if __name__ == '__main__': | |
logger = logging.getLogger('meterears') | |
# Setup Logging | |
logger.addHandler(logging.StreamHandler(stream=sys.stdout)) | |
logger.setLevel(logging.INFO) | |
RTL_TCP_HOST = os.getenv('RTL_TCP_HOST', '0.0.0.0') | |
RTL_TCP_PORT = os.getenv('RTL_TCP_PORT', 1234) | |
FILTER_ID = os.getenv('FILTER_ID', "49623078") | |
READ_DELAY = os.getenv('READ_DELAY', 10) | |
INFLUXDB_HOST = os.getenv('INFLUXDB_HOST', "localhost") | |
INFLUXDB_DATABASE = os.getenv('INFLUXDB_DATABASE', "power") | |
INFLUXDB_USER = os.getenv('INFLUXDB_USER', "") | |
INFLUXDB_PASSWORD = os.getenv('INFLUXDB_PASSWORD', "") | |
INFLUXDB_PORT = os.getenv('INFLUXDB_PORT', 8086) | |
MQTT_HOST = os.getenv('MQTT_HOST', "localhost") | |
MQTT_PORT = os.getenv('MQTT_PORT', 1883) | |
print ("- service vars") | |
print ("RTL_TCP_HOST: ", RTL_TCP_HOST) | |
print ("ARCH: ", ARCH) | |
print ("RTLAMR_PATH: ", RTLAMR_PATH) | |
print ("RTL_TCP_PORT: ", RTL_TCP_PORT) | |
print ("FILTER_ID: ", FILTER_ID) | |
print ("- influxdb vars") | |
print ("INFLUXDB_HOST: ", INFLUXDB_HOST ) | |
print ("INFLUXDB_PORT: ", INFLUXDB_PORT ) | |
print ("INFLUXDB_DATABASE: ", INFLUXDB_DATABASE ) | |
print ("INFLUXDB_USER: ", INFLUXDB_USER ) | |
print ("INFLUXDB_PASSWORD: ", INFLUXDB_PASSWORD ) | |
print("--------------------------------") | |
INFLUX_ENABLED = False | |
while isPortOpen(RTL_TCP_HOST, RTL_TCP_PORT) or True: ## What the shit is this brandon? | |
SERVER_STRING = "-server="+ RTL_TCP_HOST+ ":"+ str(RTL_TCP_PORT) | |
FILTER_ID_STRING = "-filterid=" + FILTER_ID | |
completed = subprocess.run( [RTLAMR_PATH, | |
SERVER_STRING, | |
FILTER_ID_STRING, | |
'-single=true', | |
'-format=json', | |
'-duration=10m'], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.DEVNULL) | |
try: | |
data=json.loads(completed.stdout.decode("utf-8")) | |
except ValueError: | |
logger.fatal("Unable to retrieve data from the USB SDR. Is the RTL_TCP process running?") | |
else: | |
reading = data['Message']['Consumption'] | |
if reading: | |
logger.info(("[RTL_TCP] reading: " + str(reading))) | |
#if INFLUX_ENABLED: | |
# send_reading_to_influxdb(reading) | |
WH_MULTIPLIER = 1000 | |
rate = int(reading) * WH_MULTIPLIER * (60.0 / int(READ_DELAY)) | |
current_reading_in_kwh = (int(reading) * WH_MULTIPLIER) / 1000 | |
send_mqtt( "foo", "bar") | |
send_mqtt( | |
'readings/' + str(FILTER_ID) + '/meter_reading', | |
'%s' % str(current_reading_in_kwh) | |
) #readings/49623078/meter_reading | |
send_mqtt( | |
'readings/' + str(FILTER_ID) + '/meter_rate', | |
'%s' % str(rate) | |
) | |
else: | |
logger.info("[WARN] blank reading") | |
time.sleep(int(READ_DELAY)) | |
logger.fatal("[FATAL] unable to connect to " + RTL_TCP_HOST + " on port " + str(RTL_TCP_PORT)) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment