Last active
August 7, 2024 22:19
-
-
Save denpamusic/5df805524738ea1bb9c00d25ed351c46 to your computer and use it in GitHub Desktop.
Collects ecoMAX device data and dumps it to the json file for diagnostics.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Collects device data and dumps it to the JSON file.""" | |
import asyncio | |
import json | |
import sys | |
from typing import Any, Final, Optional, Union | |
from pyplumio import ( | |
SerialConnection, | |
TcpConnection, | |
open_serial_connection, | |
open_tcp_connection, | |
) | |
from pyplumio.devices import Device | |
# Set host and port if you're using TCP connection. | |
HOST: Optional[str] = None | |
PORT: Optional[int] = None | |
# OR set device if you're using RS485 connection. | |
DEVICE: Optional[str] = None | |
# ------------------------------------- # | |
# Do not edit anything below this line. # | |
# ------------------------------------- # | |
FILENAME: Final = "ecomax_data.json" | |
REDACTED: Final = "**REDACTED**" | |
TIMEOUT: Final = 30 | |
def _is_json_serializable(data: Any) -> bool: | |
"""Check if data is JSON-serializable.""" | |
try: | |
json.dumps(data) | |
return True | |
except TypeError: | |
return False | |
def redact_device_data(data: dict[str, Any]) -> dict[str, Any]: | |
"""Redact sensitive imformation from the device data.""" | |
if "product" in data: | |
data["product"].uid = REDACTED | |
if "password" in data: | |
data["password"] = REDACTED | |
return data | |
class DeviceDataEncoder(json.JSONEncoder): | |
"""Represents a custom device data encoder.""" | |
def default(self, o: Any) -> Any: | |
"""Encode device data to JSON.""" | |
if isinstance(o, dict): | |
o = {x: self.default(y) for x, y in o.items()} | |
if isinstance(o, list): | |
o = [self.default(x) for x in o] | |
if isinstance(o, Device): | |
o = self.default(o.data) | |
if _is_json_serializable(o): | |
return o | |
return {"__type": str(type(o)), "repr": repr(o)} | |
async def main() -> int: | |
"""Collect device data and dump it to the JSON file.""" | |
connection_handler: Union[SerialConnection, TcpConnection] | |
if DEVICE is not None and HOST is None and PORT is None: | |
connection_handler = open_serial_connection(device=DEVICE) | |
elif HOST is not None and PORT is not None and DEVICE is None: | |
connection_handler = open_tcp_connection(host=HOST, port=PORT) | |
else: | |
sys.stdout.write("Set either HOST and PORT for TCP or DEVICE for RS485\n") | |
return 1 | |
async with connection_handler as connection: | |
ecomax: Device = await connection.get("ecomax") | |
sys.stdout.write("Collecting data, please wait...\n") | |
await asyncio.gather( | |
ecomax.wait_for("loaded", timeout=TIMEOUT), | |
ecomax.wait_for("regdata", timeout=TIMEOUT), | |
) | |
# Dump collected data to the file. | |
sys.stdout.write(f"Saving collected data to {FILENAME}...\n") | |
with open(FILENAME, "w", encoding="UTF-8") as file: | |
data = redact_device_data(dict(ecomax.data)) | |
file.write(json.dumps(data, cls=DeviceDataEncoder, indent=2)) | |
sys.stdout.write("All done!\n") | |
return 0 | |
sys.exit(asyncio.run(main())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment