Created
May 8, 2025 14:43
-
-
Save Roadmaster/2de2beffffec21252e78f91df9040ba2 to your computer and use it in GitHub Desktop.
SImple prometheus exporter for aranet4 data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# requires-python = ">=3.11" | |
# dependencies = [ | |
# "aiohttp", | |
# "aranet4", | |
# ] | |
# /// | |
# | |
# Simple prometheus exporter for aranet4 data. | |
# Set your aranet4's BT address in the ARANET_MAC environment variable | |
# Exposes Prometheus metrics on port 8080 | |
import os | |
import time | |
import aranet4 | |
from aiohttp import web | |
import asyncio | |
import logging | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
device_mac = os.environ.get("ARANET_MAC", None) | |
aranet_frequency = os.environ.get("ARANET_FREQUENCY_SECONDS", 5 * 60) | |
if not device_mac: | |
raise SystemExit("Set the aranet4 address in ARANET_MAC environment variable") | |
last_reading = None | |
async def read_aranet(): | |
try: | |
return await aranet4.client._current_reading(device_mac) | |
except Exception as e: | |
logger.exception(e) | |
return None | |
async def periodic(app): | |
global last_reading | |
while True: | |
logger.info(f"Querying device {device_mac}") | |
reading = await read_aranet() | |
outcome = bool(reading) | |
logger.info(f"Queried device: {outcome}") | |
last_reading = reading | |
await asyncio.sleep(aranet_frequency) | |
async def start_periodic(app): | |
asyncio.create_task(periodic(app)) | |
async def stop_periodic(app): | |
return | |
async def handle_aranet(request): | |
if not last_reading: | |
return web.Response(text="") | |
device_name = last_reading.name | |
co2_value = last_reading.co2 | |
temp_value = last_reading.temperature | |
humidity_value = last_reading.humidity | |
pressure_value = last_reading.pressure | |
time_ms = time.time_ns() // 1_000_000 | |
labels = f"device_mac={device_mac},name={device_name}" | |
samples = [ | |
f"air_monitor_co2_ppm{{{labels}}} {co2_value} {time_ms}", | |
f"air_monitor_temperature_c{{{labels}}} {temp_value} {time_ms}", | |
f"air_monitor_humidity_rel_percent{{{labels}}} {humidity_value} {time_ms}", | |
f"air_monitor_pressure_hpa{{{labels}}} {pressure_value} {time_ms}", | |
] | |
text = "\n".join(samples) | |
return web.Response(text=text) | |
app = web.Application() | |
app.on_startup.append(start_periodic) | |
app.on_cleanup.append(stop_periodic) | |
app.add_routes([web.get("/", handle_aranet)]) | |
if __name__ == "__main__": | |
web.run_app(app) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment