Created
December 20, 2020 13:53
-
-
Save shbatm/d34d33fb2462dd020b8dcc6392f0f6fe to your computer and use it in GitHub Desktop.
Purge Device Tracker Entries from Home Assistant Storage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/local/bin/python3 | |
# WARNING: USE AT YOUR OWN RISK, NOT RESPONSIBLE FOR BOTCHED HOME ASSISTANT CONFIGURATIONS | |
# Change the two lines below and run with `python3 device_tracker_purge.py` | |
PATH = "/config/.storage/core.entity_registry" | |
PURGE_PATTERN = "^device_tracker\." | |
import json | |
import logging | |
import os | |
import re | |
import sys | |
import tempfile | |
from collections import deque | |
from typing import Any, Dict, List, Optional, Type, Union | |
_LOGGER = logging.getLogger(__name__) | |
def parse_entities(): | |
registry_path = f"{PATH}" | |
data = load_json(registry_path) | |
purged_entities = [ | |
e | |
for e in data["data"]["entities"] | |
if not re.search(PURGE_PATTERN, e["entity_id"]) | |
] | |
_LOGGER.warning( | |
"Original entities: %s; after purging: %s", | |
len(data["data"]["entities"]), | |
len(purged_entities), | |
) | |
data["data"]["entities"] = purged_entities | |
save_json(registry_path, data) | |
def load_json( | |
filename: str, default: Union[List, Dict, None] = None | |
) -> Union[List, Dict]: | |
"""Load JSON data from a file and return as dict or list. | |
Defaults to returning empty dict if file is not found. | |
""" | |
try: | |
with open(filename, encoding="utf-8") as fdesc: | |
return json.loads(fdesc.read()) # type: ignore | |
except FileNotFoundError: | |
# This is not a fatal error | |
_LOGGER.debug("JSON file not found: %s", filename) | |
except ValueError as error: | |
_LOGGER.exception("Could not parse JSON content: %s", filename) | |
raise ValueError(error) | |
except OSError as error: | |
_LOGGER.exception("JSON file reading failed: %s", filename) | |
raise ValueError(error) | |
return {} if default is None else default | |
def save_json( | |
filename: str, | |
data: Union[List, Dict], | |
private: bool = False, | |
*, | |
encoder: Optional[Type[json.JSONEncoder]] = None, | |
) -> None: | |
"""Save JSON data to a file. | |
Returns True on success. | |
""" | |
try: | |
json_data = json.dumps(data, sort_keys=True, indent=4, cls=encoder) | |
except TypeError: | |
# pylint: disable=no-member | |
msg = f"Failed to serialize to JSON: {filename}. Bad data found at {', '.join(find_paths_unserializable_data(data))}" | |
_LOGGER.error(msg) | |
raise ValueError(msg) | |
tmp_filename = "" | |
tmp_path = os.path.split(filename)[0] | |
try: | |
# Modern versions of Python tempfile create this file with mode 0o600 | |
with tempfile.NamedTemporaryFile( | |
mode="w", encoding="utf-8", dir=tmp_path, delete=False | |
) as fdesc: | |
fdesc.write(json_data) | |
tmp_filename = fdesc.name | |
if not private: | |
os.chmod(tmp_filename, 0o644) | |
os.replace(tmp_filename, filename) | |
except OSError as error: | |
_LOGGER.exception("Saving JSON file failed: %s", filename) | |
raise OSError(error) | |
finally: | |
if os.path.exists(tmp_filename): | |
try: | |
os.remove(tmp_filename) | |
except OSError as err: | |
# If we are cleaning up then something else went wrong, so | |
# we should suppress likely follow-on errors in the cleanup | |
_LOGGER.error("JSON replacement cleanup failed: %s", err) | |
def find_paths_unserializable_data(bad_data: Any) -> List[str]: | |
"""Find the paths to unserializable data. | |
This method is slow! Only use for error handling. | |
""" | |
to_process = deque([(bad_data, "$")]) | |
invalid = [] | |
while to_process: | |
obj, obj_path = to_process.popleft() | |
try: | |
json.dumps(obj) | |
continue | |
except TypeError: | |
pass | |
if isinstance(obj, dict): | |
for key, value in obj.items(): | |
try: | |
# Is key valid? | |
json.dumps({key: None}) | |
except TypeError: | |
invalid.append(f"{obj_path}<key: {key}>") | |
else: | |
# Process value | |
to_process.append((value, f"{obj_path}.{key}")) | |
elif isinstance(obj, list): | |
for idx, value in enumerate(obj): | |
to_process.append((value, f"{obj_path}[{idx}]")) | |
else: | |
invalid.append(obj_path) | |
return invalid | |
def main(arguments): | |
"""Execute primary loop.""" | |
fmt = "%(asctime)s %(levelname)s [%(name)s] %(message)s" | |
datefmt = "%Y-%m-%d %H:%M:%S" | |
logging.basicConfig(format=fmt, datefmt=datefmt, level=logging.DEBUG) | |
parse_entities() | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment