Created
October 8, 2017 13:18
-
-
Save h3po/d6768c2787254291b358cb4a6f221154 to your computer and use it in GitHub Desktop.
Simple prometheus exporter for temperature and power sensor data from a Corsar HX750i (and possibly other HXi and RMi series) power supply. Based on OpenCorsairLink by audiohacked
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
2017-10 by [email protected] | |
using protocol reverse engineered by audiohacked: https://github.com/audiohacked/OpenCorsairLink | |
""" | |
import usb1, struct, threading | |
from http.server import HTTPServer | |
from client_python.prometheus_client.core import Metric, REGISTRY | |
from client_python.prometheus_client.exposition import MetricsHandler | |
#HX750i | |
VID = 0x1b1c | |
PID = 0x1c05 | |
class UpdatingGauge(Metric): | |
def __init__(self, name, documentation, labelnames): | |
super().__init__(name, documentation, "gauge") | |
self._samples = {} | |
self.samples = [] | |
self._labelnames = labelnames | |
def new_sample(self, labelvalues, value): | |
self._samples[labelvalues] = value | |
def update(self): | |
self.samples = ((self.name, dict(zip(self._labelnames, labelvalues)), value) for labelvalues, value in self._samples.items()) | |
class CorsairPsuCollector(object): | |
def __init__(self, vid, pid, ep=1, metric_prefix="corsairpsu"): | |
#usb init | |
self.ep = ep | |
self.context = usb1.USBContext() | |
self.handle = self.context.openByVendorIDAndProductID(vid, pid, skip_on_error=True) | |
if self.handle is None: raise Exception("Could not aquire device handle") | |
self.packetsize = self.handle.getDevice().getMaxPacketSize(self.ep) | |
#metrics init | |
self.metric_prefix = metric_prefix | |
self.common_labels = ("name", "vendor", "product") | |
self.common_label_values = ( | |
self._getValue(b"\xfe\x03", self._readString), | |
self._getValue(b"\x03\x99", self._readString), | |
self._getValue(b"\x03\x9a", self._readString)) | |
self.gauges = { | |
"temperature": UpdatingGauge("%s_temperature_celsius" % self.metric_prefix, | |
"temperature in degrees celsius at sensor $sensor", | |
self.common_labels + ("sensor",)), | |
"voltage": UpdatingGauge("%s_voltage_volt" % self.metric_prefix, | |
"voltage in volts at rail $rail", | |
self.common_labels + ("rail",)), | |
"current": UpdatingGauge("%s_current_ampere" % self.metric_prefix, | |
"current in amperes at rail $rail", | |
self.common_labels + ("rail",)), | |
"power": UpdatingGauge("%s_power_watt" % self.metric_prefix, | |
"power in watts at rail $rail", | |
self.common_labels + ("rail",))} | |
@staticmethod | |
def _bytesToFloat(b): | |
assert len(b) == 2 | |
tmp = struct.unpack("H", b)[0] | |
exponent = tmp >> 11 | |
fraction = tmp & 2047 | |
if exponent > 15: exponent = -(32-exponent) | |
if fraction > 1023: fraction = -(2048-fraction) | |
if fraction & 1: fraction += 1 | |
return float(fraction * pow(2, exponent)) | |
def _read(self): | |
return self.handle.interruptRead(self.ep, self.packetsize, timeout=3) | |
def _write(self, data): | |
assert len(data) <= self.packetsize | |
return self.handle.interruptWrite(self.ep, data) | |
def _readString(self): | |
return self._read()[2:].decode("ascii").split("\x00")[0] | |
def _readFloat(self): | |
return self._bytesToFloat(self._read()[2:4]) | |
def _selectRail(self, rail): | |
self._write(b"\x02\x00" + bytes((rail,))) | |
def _getValue(self, command, readfunction): | |
self._write(command) | |
return readfunction() | |
def collect(self): | |
self.gauges["temperature"].new_sample(self.common_label_values + ("0",), self._getValue(b"\x03\x8d", self._readFloat)) | |
self.gauges["temperature"].new_sample(self.common_label_values + ("1",), self._getValue(b"\x03\x8e", self._readFloat)) | |
self.gauges["voltage"].new_sample(self.common_label_values + ("input",), self._getValue(b"\x03\x88", self._readFloat)) | |
for railindex, railname in enumerate(("12V", "5V", "3V3")): | |
self._selectRail(railindex) | |
self.gauges["voltage"].new_sample(self.common_label_values + (railname,), self._getValue(b"\x03\x8b", self._readFloat)) | |
self.gauges["current"].new_sample(self.common_label_values + (railname,), self._getValue(b"\x03\x8c", self._readFloat)) | |
self.gauges["power"].new_sample(self.common_label_values + (railname,), self._getValue(b"\x03\x96", self._readFloat)) | |
for gauge in self.gauges.values(): | |
gauge.update() | |
yield gauge | |
def start_http_server(port, addr=''): | |
"""Starts a HTTP server for prometheus metrics as a daemon thread.""" | |
class PrometheusMetricsServer(threading.Thread): | |
def run(self): | |
httpd = HTTPServer((addr, port), MetricsHandler) | |
httpd.serve_forever() | |
t = PrometheusMetricsServer() | |
t.start() | |
return t | |
if __name__ == "__main__": | |
REGISTRY.register(CorsairPsuCollector(VID, PID)) | |
start_http_server(9666).join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment