Skip to content

Instantly share code, notes, and snippets.

@craiggenner
Forked from imduffy15/alexa-sync.py
Last active February 14, 2023 17:23
Show Gist options
  • Save craiggenner/6732b1b5f5c38dd5db2ebeb5bcb14121 to your computer and use it in GitHub Desktop.
Save craiggenner/6732b1b5f5c38dd5db2ebeb5bcb14121 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import json
import requests
import urllib3
import sys
urllib3.disable_warnings()
cookie = open("cookie.txt").readline().strip()
csrf_token = [x.split("=") for x in cookie.split(";") if "csrf" in x][0][1]
headers = {"csrf": csrf_token, "Cookie": cookie}
# Get all existing groups...
groups = requests.get(
"https://alexa.amazon.co.uk/api/phoenix/group", headers=headers, verify=False
)
if groups.status_code == 401:
print("Auth error, update cookies")
sys.exit(1)
groups = groups.json()
# Delete all existing groups
for group in groups["applianceGroups"]:
print(f"Deleting group {group.get('name')}")
delete = requests.delete(
f"https://alexa.amazon.co.uk/api/phoenix/group/{group.get('groupId')}",
headers=headers,
verify=False,
)
delete.raise_for_status()
# Get home assistant entities and their areas
entity_registry_file = open("core.entity_registry")
entity_registry = json.load(entity_registry_file)
device_registry_file = open("core.device_registry")
device_registry = json.load(device_registry_file)['data']['devices']
device_to_area = {
x["id"]: {"area_id": x.get("area_id"), "name": x.get("name"), "manufacturer": x.get("manufacturer"), "identifiers": x.get("identifiers")}
for x in device_registry
if x.get("area_id")
}
new_entity_registry = {}
for x in entity_registry["data"]["entities"]:
if x.get("area_id"):
new_entity_registry[x["entity_id"]]= {"area_id": x.get("area_id"), "name": x.get("name")}
elif x.get("device_id") and device_to_area.get(x["device_id"]):
area_id = device_to_area[x["device_id"]]["area_id"]
new_entity_registry[x["entity_id"]]= {"area_id": area_id, "name": x.get("name")}
entity_registry = new_entity_registry
# Get the areas so we have their pretty name format
area_registry_file = open("core.area_registry")
area_registry = json.load(area_registry_file)
areas = {
x.get("id"): x.get("name") for x in area_registry.get("data", []).get("areas", [])
}
# Get all devices on alexa
# This fails for some reason sometimes so there's terrible retry logic here.
raw_devices_json = None
while raw_devices_json is None:
try:
raw_devices = requests.get(
"https://alexa.amazon.co.uk/api/phoenix", headers=headers, verify=False
).json()
raw_devices_json = json.loads(raw_devices["networkDetail"])
except Exception:
time.sleep(1)
group_updates = {}
# Map the alexa devices to their home assistant device and pull the area
# id from home assistant
for skill, details in raw_devices_json["locationDetails"]["locationDetails"][
"Default_Location"
]["amazonBridgeDetails"]["amazonBridgeDetails"].items():
details = details["applianceDetails"]["applianceDetails"]
for device, value in details.items():
if value.get("manufacturerName", "") == "Royal Philips Electronics":
print("Found Royal Philips Electronics")
elif value.get("friendlyDescription", "") == "Amazon smart device":
for x in device_to_area:
if value.get("friendlyName", "") == device_to_area[x]['name'] and device_to_area[x]['manufacturer'] == "Amazon" and "alexa_media" in device_to_area[x]['identifiers'][0]:
group_name = areas.get(device_to_area[x]['area_id'], "")
updates = group_updates.get(group_name, [])
updates.append(value.get("applianceId"))
group_updates[group_name] = updates
elif value.get("manufacturerName", "") == "Home Assistant":
home_assistant_entity_id = (
value.get("friendlyDescription").split(" ")[0].strip()
)
group_name = areas.get(
entity_registry.get(home_assistant_entity_id, {}).get("area_id", ""), {}
)
if group_name:
updates = group_updates.get(group_name, [])
updates.append(value.get("applianceId"))
group_updates[group_name] = updates
# Create all the groups with their devices
for group_name, devices in group_updates.items():
body = {
"applianceIds": devices,
"name": group_name,
}
try:
pretty = {"group": group_name, "devices": [device.split("#")[1] if '#' in device else device for device in devices]}
#print(json.dumps(pretty, indent=2))
print(f"Creating group {group_name}")
update_group = requests.post(
f"https://alexa.amazon.co.uk/api/phoenix/group",
headers=headers,
verify=False,
json=body,
)
update_group.raise_for_status()
except:
print("Failed to process this payload:")
print(json.dumps(body, indent=2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment