-
-
Save craiggenner/6732b1b5f5c38dd5db2ebeb5bcb14121 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import json | |
import requests | |
import urllib3 | |
import sys | |
urllib3.disable_warnings() | |
cookie = open("cookie.txt").readline().strip() | |
csrf_token = [x.split("=") for x in cookie.split(";") if "csrf" in x][0][1] | |
headers = {"csrf": csrf_token, "Cookie": cookie} | |
# Get all existing groups... | |
groups = requests.get( | |
"https://alexa.amazon.co.uk/api/phoenix/group", headers=headers, verify=False | |
) | |
if groups.status_code == 401: | |
print("Auth error, update cookies") | |
sys.exit(1) | |
groups = groups.json() | |
# Delete all existing groups | |
for group in groups["applianceGroups"]: | |
print(f"Deleting group {group.get('name')}") | |
delete = requests.delete( | |
f"https://alexa.amazon.co.uk/api/phoenix/group/{group.get('groupId')}", | |
headers=headers, | |
verify=False, | |
) | |
delete.raise_for_status() | |
# Get home assistant entities and their areas | |
entity_registry_file = open("core.entity_registry") | |
entity_registry = json.load(entity_registry_file) | |
device_registry_file = open("core.device_registry") | |
device_registry = json.load(device_registry_file)['data']['devices'] | |
device_to_area = { | |
x["id"]: {"area_id": x.get("area_id"), "name": x.get("name"), "manufacturer": x.get("manufacturer"), "identifiers": x.get("identifiers")} | |
for x in device_registry | |
if x.get("area_id") | |
} | |
new_entity_registry = {} | |
for x in entity_registry["data"]["entities"]: | |
if x.get("area_id"): | |
new_entity_registry[x["entity_id"]]= {"area_id": x.get("area_id"), "name": x.get("name")} | |
elif x.get("device_id") and device_to_area.get(x["device_id"]): | |
area_id = device_to_area[x["device_id"]]["area_id"] | |
new_entity_registry[x["entity_id"]]= {"area_id": area_id, "name": x.get("name")} | |
entity_registry = new_entity_registry | |
# Get the areas so we have their pretty name format | |
area_registry_file = open("core.area_registry") | |
area_registry = json.load(area_registry_file) | |
areas = { | |
x.get("id"): x.get("name") for x in area_registry.get("data", []).get("areas", []) | |
} | |
# Get all devices on alexa | |
# This fails for some reason sometimes so there's terrible retry logic here. | |
raw_devices_json = None | |
while raw_devices_json is None: | |
try: | |
raw_devices = requests.get( | |
"https://alexa.amazon.co.uk/api/phoenix", headers=headers, verify=False | |
).json() | |
raw_devices_json = json.loads(raw_devices["networkDetail"]) | |
except Exception: | |
time.sleep(1) | |
group_updates = {} | |
# Map the alexa devices to their home assistant device and pull the area | |
# id from home assistant | |
for skill, details in raw_devices_json["locationDetails"]["locationDetails"][ | |
"Default_Location" | |
]["amazonBridgeDetails"]["amazonBridgeDetails"].items(): | |
details = details["applianceDetails"]["applianceDetails"] | |
for device, value in details.items(): | |
if value.get("manufacturerName", "") == "Royal Philips Electronics": | |
print("Found Royal Philips Electronics") | |
elif value.get("friendlyDescription", "") == "Amazon smart device": | |
for x in device_to_area: | |
if value.get("friendlyName", "") == device_to_area[x]['name'] and device_to_area[x]['manufacturer'] == "Amazon" and "alexa_media" in device_to_area[x]['identifiers'][0]: | |
group_name = areas.get(device_to_area[x]['area_id'], "") | |
updates = group_updates.get(group_name, []) | |
updates.append(value.get("applianceId")) | |
group_updates[group_name] = updates | |
elif value.get("manufacturerName", "") == "Home Assistant": | |
home_assistant_entity_id = ( | |
value.get("friendlyDescription").split(" ")[0].strip() | |
) | |
group_name = areas.get( | |
entity_registry.get(home_assistant_entity_id, {}).get("area_id", ""), {} | |
) | |
if group_name: | |
updates = group_updates.get(group_name, []) | |
updates.append(value.get("applianceId")) | |
group_updates[group_name] = updates | |
# Create all the groups with their devices | |
for group_name, devices in group_updates.items(): | |
body = { | |
"applianceIds": devices, | |
"name": group_name, | |
} | |
try: | |
pretty = {"group": group_name, "devices": [device.split("#")[1] if '#' in device else device for device in devices]} | |
#print(json.dumps(pretty, indent=2)) | |
print(f"Creating group {group_name}") | |
update_group = requests.post( | |
f"https://alexa.amazon.co.uk/api/phoenix/group", | |
headers=headers, | |
verify=False, | |
json=body, | |
) | |
update_group.raise_for_status() | |
except: | |
print("Failed to process this payload:") | |
print(json.dumps(body, indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment