Skip to content

Instantly share code, notes, and snippets.

@shbatm
Created December 20, 2020 13:53
Show Gist options
  • Save shbatm/d34d33fb2462dd020b8dcc6392f0f6fe to your computer and use it in GitHub Desktop.
Save shbatm/d34d33fb2462dd020b8dcc6392f0f6fe to your computer and use it in GitHub Desktop.
Purge Device Tracker Entries from Home Assistant Storage
#! /usr/local/bin/python3
# WARNING: USE AT YOUR OWN RISK, NOT RESPONSIBLE FOR BOTCHED HOME ASSISTANT CONFIGURATIONS
# Change the two lines below and run with `python3 device_tracker_purge.py`
PATH = "/config/.storage/core.entity_registry"
PURGE_PATTERN = "^device_tracker\."
import json
import logging
import os
import re
import sys
import tempfile
from collections import deque
from typing import Any, Dict, List, Optional, Type, Union
_LOGGER = logging.getLogger(__name__)
def parse_entities():
registry_path = f"{PATH}"
data = load_json(registry_path)
purged_entities = [
e
for e in data["data"]["entities"]
if not re.search(PURGE_PATTERN, e["entity_id"])
]
_LOGGER.warning(
"Original entities: %s; after purging: %s",
len(data["data"]["entities"]),
len(purged_entities),
)
data["data"]["entities"] = purged_entities
save_json(registry_path, data)
def load_json(
filename: str, default: Union[List, Dict, None] = None
) -> Union[List, Dict]:
"""Load JSON data from a file and return as dict or list.
Defaults to returning empty dict if file is not found.
"""
try:
with open(filename, encoding="utf-8") as fdesc:
return json.loads(fdesc.read()) # type: ignore
except FileNotFoundError:
# This is not a fatal error
_LOGGER.debug("JSON file not found: %s", filename)
except ValueError as error:
_LOGGER.exception("Could not parse JSON content: %s", filename)
raise ValueError(error)
except OSError as error:
_LOGGER.exception("JSON file reading failed: %s", filename)
raise ValueError(error)
return {} if default is None else default
def save_json(
filename: str,
data: Union[List, Dict],
private: bool = False,
*,
encoder: Optional[Type[json.JSONEncoder]] = None,
) -> None:
"""Save JSON data to a file.
Returns True on success.
"""
try:
json_data = json.dumps(data, sort_keys=True, indent=4, cls=encoder)
except TypeError:
# pylint: disable=no-member
msg = f"Failed to serialize to JSON: {filename}. Bad data found at {', '.join(find_paths_unserializable_data(data))}"
_LOGGER.error(msg)
raise ValueError(msg)
tmp_filename = ""
tmp_path = os.path.split(filename)[0]
try:
# Modern versions of Python tempfile create this file with mode 0o600
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", dir=tmp_path, delete=False
) as fdesc:
fdesc.write(json_data)
tmp_filename = fdesc.name
if not private:
os.chmod(tmp_filename, 0o644)
os.replace(tmp_filename, filename)
except OSError as error:
_LOGGER.exception("Saving JSON file failed: %s", filename)
raise OSError(error)
finally:
if os.path.exists(tmp_filename):
try:
os.remove(tmp_filename)
except OSError as err:
# If we are cleaning up then something else went wrong, so
# we should suppress likely follow-on errors in the cleanup
_LOGGER.error("JSON replacement cleanup failed: %s", err)
def find_paths_unserializable_data(bad_data: Any) -> List[str]:
"""Find the paths to unserializable data.
This method is slow! Only use for error handling.
"""
to_process = deque([(bad_data, "$")])
invalid = []
while to_process:
obj, obj_path = to_process.popleft()
try:
json.dumps(obj)
continue
except TypeError:
pass
if isinstance(obj, dict):
for key, value in obj.items():
try:
# Is key valid?
json.dumps({key: None})
except TypeError:
invalid.append(f"{obj_path}<key: {key}>")
else:
# Process value
to_process.append((value, f"{obj_path}.{key}"))
elif isinstance(obj, list):
for idx, value in enumerate(obj):
to_process.append((value, f"{obj_path}[{idx}]"))
else:
invalid.append(obj_path)
return invalid
def main(arguments):
"""Execute primary loop."""
fmt = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
datefmt = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(format=fmt, datefmt=datefmt, level=logging.DEBUG)
parse_entities()
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment