Skip to content

Instantly share code, notes, and snippets.

@mitchellrj
Last active February 22, 2025 20:46
Show Gist options
  • Save mitchellrj/86722a2120d66abefd7d6eecc390b4fc to your computer and use it in GitHub Desktop.
Save mitchellrj/86722a2120d66abefd7d6eecc390b4fc to your computer and use it in GitHub Desktop.
Basic Prometheus exporter and REST API for a single sensor from the LeChacal RPICT hat.
#!env python3
#
# Usage: rpict_exporter.py DEVICE DEVICE_TYPE [HOST] [PORT] [SENSOR NAMES...]
# Example usage: rpict_exporter.py /dev/ttyAMA0 RPICT3T1 0.0.0.0 9999
#
# MIT License
#
# Copyright (c) 2021 Richard Mitchell
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this
# software and associated documentation files (the "Software"), to deal in the Software
# without restriction, including without limitation the rights to use, copy, modify,
# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice (including the next paragraph)
# shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import enum
import json
import logging
import signal
import sys
import threading
import time
import flask
import RPi.GPIO as GPIO
import serial
global reader
reader = None
logger = logging.getLogger()
logging.getLogger("werkzeug").disabled = True
logger.setLevel(logging.INFO)
class DeviceType(enum.Enum):
RPICT3V1 = 'RPICT3V1'
RPICT3T1 = 'RPICT3T1'
RPICT4T4 = 'RPICT4T4'
RPICT7V1 = 'RPICT7V1'
RPICT4V3 = 'RPICT4V3'
RPICT8 = 'RPICT8'
RPIZCT4V3T2 = 'RPIZCT4V3T2'
RPIZ_CT3V1 = 'RPIZ_CT3V1'
RPIZ_CT3T1 = 'RPIZ_CT3T1'
DEVICE_TYPE_LABELS = {
DeviceType.RPICT3V1: ([
{
'__name__': 'power_w',
},
] * 3) + ([
{
'__name__': 'irms_ma',
}
] * 3) + [
{
'__name__': 'vrms_v',
}
],
DeviceType.RPICT3T1: ([
{
'__name__': 'estimated_power_w',
},
] * 3) + [
{
'__name__': 'temperature_c',
}
],
DeviceType.RPICT4T4: ([
{
'__name__': 'estimated_power_w',
},
] * 4) + ([
{
'__name__': 'temperature_c',
}
] * 4),
DeviceType.RPICT7V1: ([
{
'__name__': 'power_w',
},
] * 7) + ([
{
'__name__': 'irms_ma',
}
] * 7) + [
{
'__name__': 'vrms_v',
}
],
DeviceType.RPICT4V3: ([
{
'__name__': 'vrms_v',
},
] * 4) + ([
{
'__name__': 'power_w',
}
] * 4) + ([
{
'__name__': 'irms_ma',
}
] * 4) + ([
{
'__name__': 'power_factor',
}
] * 4),
DeviceType.RPICT8: ([
{
'__name__': 'estimated_power_w',
},
] * 8),
DeviceType.RPIZCT4V3T2: ([
{
'__name__': 'vrms_v',
},
] * 4) + ([
{
'__name__': 'power_w',
}
] * 4) + ([
{
'__name__': 'power_factor',
}
] * 4) + ([
{
'__name__': 'irms_ma',
}
] * 4) + [
{
'__name__': 'temperature_c',
'sensor_type': 'RTD_thermocouple',
}
] + [ # special case where all remaining values are of this type is accounted for below
{
'__name__': 'temperature_c',
'sensor_type': 'DS18B20',
}
],
}
DEVICE_TYPE_LABELS[DeviceType.RPIZ_CT3V1] = DEVICE_TYPE_LABELS[DeviceType.RPICT3V1]
DEVICE_TYPE_LABELS[DeviceType.RPIZ_CT3T1] = DEVICE_TYPE_LABELS[DeviceType.RPICT3T1]
DEFAULT_DEVICE = '/dev/ttyAMA0'
DEFAULT_DEVICE_TYPE = DeviceType.RPICT3V1
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 9999
DEFAULT_TIMEOUT = 5
DEFAULT_POLLING = 2000
def reset_hardware():
rst_pin=4
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(rst_pin,GPIO.OUT)
GPIO.output(rst_pin,GPIO.LOW)
time.sleep(0.5)
GPIO.output(rst_pin,GPIO.HIGH)
class Reader(threading.Thread):
def __init__(self, device, device_type, sensor_names, polling, timeout):
threading.Thread.__init__(self)
self.device = device
self.polling = polling
self.device_type = device_type
self.sensor_names = sensor_names
self.timeout = timeout
self.labels = []
self.node_id = None
self.last_updated = None
self.values = []
self._exit = False
self._super_signal_handlers = {
signal.SIGINT: signal.signal(signal.SIGINT, self.stop),
signal.SIGTERM: signal.signal(signal.SIGTERM, self.stop),
signal.SIGHUP: signal.signal(signal.SIGHUP, self.stop),
}
def run(self):
self._exit = False
self.ser = serial.Serial(self.device, 38400, timeout=self.timeout)
start = time.time()
count = 0
while self.ser.is_open:
if self._exit:
logger.info('Reader thread stop requested')
break
line = self.ser.readline()
if not line:
logger.debug('Missed readline')
continue
count += 1
if abs((time.time() - start) - (count * self.polling / 1000)) > self.timeout:
logger.warning('Reading from serial port lagging behind polling frequency (%s seconds). Is the system CPU load too high? Resetting hardware.' % behind)
reset_hardware()
continue
parts = line.decode('ascii').strip().split()
self.node_id = parts[0]
self.values = list(map(float, parts[1:]))
# only set after values decoded
self.last_updated = time.time()
else:
logger.info('Serial port closed unexpectedly')
self.ser.close()
logger.info('Reader thread stopped')
def stop(self, s=None, f=None):
if s is not None:
logger.info('Received {}, reader thread stopping'.format(s))
self._exit = True
self.ser.cancel_read()
if s is not None:
if self._super_signal_handlers[s] is not None and self._super_signal_handlers not in signal.Handlers:
self._super_signal_handlers[s](s, f)
self.ser.close()
reset_hardware()
def prometheus_metrics(self):
if self.node_id is None:
return None
device_labels = DEVICE_TYPE_LABELS[self.device_type]
last_updated = self.last_updated
for i, value in enumerate(self.values):
if i > len(device_labels):
labels = dict(device_labels[-1])
else:
labels = dict(device_labels[i])
if i < len(self.sensor_names) and self.sensor_names[i]:
labels['name'] = self.sensor_names[i]
else:
labels['name'] = 'sensor_{}'.format(i + 1)
labels['node_id'] = reader.node_id
metric_name = labels.pop('__name__')
yield '{metric_name}{{{labels}}} {value}'.format(
metric_name=metric_name,
value=value,
labels=','.join(
'{}="{}"'.format(k, repr(v)[1:-1])
for k, v in labels.items()
)
)
yield 'last_updated{{node_id="{}"}} {}'.format(self.node_id, last_updated)
def json(self):
if self.node_id is None:
return '{}'
return json.dumps({
'last_updated': self.last_updated,
'node_id': self.node_id,
'sensors': self.values
})
app = flask.Flask(__name__)
@app.route('/shutdown')
def shutdown():
reader.stop()
func = flask.request.environ.get('werkzeug.server.shutdown', lambda: None)
func()
return 'Shutting down...'
@app.route('/metrics')
def prometheus():
body = list(reader.prometheus_metrics())
try:
response = flask.make_response('\n'.join(body))
except Exception:
response = flask.make_response('')
response.status_code = 503
response.headers['Content-Type'] = 'text/plain'
return response
@app.route('/')
def rest():
try:
response = flask.make_response(reader.json())
except Exception as e:
logger.exception(e)
response = flask.make_response(json.dumps({}))
response.status_code = 503
response.headers['Content-Type'] = 'application/json'
return response
if __name__ == '__main__':
device = DEFAULT_DEVICE
device_type = DEFAULT_DEVICE_TYPE
host = DEFAULT_HOST
port = DEFAULT_PORT
polling = DEFAULT_POLLING
if len(sys.argv) > 1:
device = sys.argv[1]
if len(sys.argv) > 2:
device_type = DeviceType(sys.argv[2])
if len(sys.argv) > 3:
host = sys.argv[3]
if len(sys.argv) > 4:
port = sys.argv[4]
sensor_names = sys.argv[5:]
reader = Reader(device, device_type, sensor_names, polling, timeout=DEFAULT_TIMEOUT)
reader.start()
app.run(host=host, port=port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment