Skip to content

Instantly share code, notes, and snippets.

@denpamusic
Last active August 7, 2024 22:19
Show Gist options
  • Save denpamusic/5df805524738ea1bb9c00d25ed351c46 to your computer and use it in GitHub Desktop.
Save denpamusic/5df805524738ea1bb9c00d25ed351c46 to your computer and use it in GitHub Desktop.
Collects ecoMAX device data and dumps it to the json file for diagnostics.
"""Collects device data and dumps it to the JSON file."""
import asyncio
import json
import sys
from typing import Any, Final, Optional, Union
from pyplumio import (
SerialConnection,
TcpConnection,
open_serial_connection,
open_tcp_connection,
)
from pyplumio.devices import Device
# Set host and port if you're using TCP connection.
HOST: Optional[str] = None
PORT: Optional[int] = None
# OR set device if you're using RS485 connection.
DEVICE: Optional[str] = None
# ------------------------------------- #
# Do not edit anything below this line. #
# ------------------------------------- #
FILENAME: Final = "ecomax_data.json"
REDACTED: Final = "**REDACTED**"
TIMEOUT: Final = 30
def _is_json_serializable(data: Any) -> bool:
"""Check if data is JSON-serializable."""
try:
json.dumps(data)
return True
except TypeError:
return False
def redact_device_data(data: dict[str, Any]) -> dict[str, Any]:
"""Redact sensitive imformation from the device data."""
if "product" in data:
data["product"].uid = REDACTED
if "password" in data:
data["password"] = REDACTED
return data
class DeviceDataEncoder(json.JSONEncoder):
"""Represents a custom device data encoder."""
def default(self, o: Any) -> Any:
"""Encode device data to JSON."""
if isinstance(o, dict):
o = {x: self.default(y) for x, y in o.items()}
if isinstance(o, list):
o = [self.default(x) for x in o]
if isinstance(o, Device):
o = self.default(o.data)
if _is_json_serializable(o):
return o
return {"__type": str(type(o)), "repr": repr(o)}
async def main() -> int:
"""Collect device data and dump it to the JSON file."""
connection_handler: Union[SerialConnection, TcpConnection]
if DEVICE is not None and HOST is None and PORT is None:
connection_handler = open_serial_connection(device=DEVICE)
elif HOST is not None and PORT is not None and DEVICE is None:
connection_handler = open_tcp_connection(host=HOST, port=PORT)
else:
sys.stdout.write("Set either HOST and PORT for TCP or DEVICE for RS485\n")
return 1
async with connection_handler as connection:
ecomax: Device = await connection.get("ecomax")
sys.stdout.write("Collecting data, please wait...\n")
await asyncio.gather(
ecomax.wait_for("loaded", timeout=TIMEOUT),
ecomax.wait_for("regdata", timeout=TIMEOUT),
)
# Dump collected data to the file.
sys.stdout.write(f"Saving collected data to {FILENAME}...\n")
with open(FILENAME, "w", encoding="UTF-8") as file:
data = redact_device_data(dict(ecomax.data))
file.write(json.dumps(data, cls=DeviceDataEncoder, indent=2))
sys.stdout.write("All done!\n")
return 0
sys.exit(asyncio.run(main()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment