Created
August 7, 2022 23:40
-
-
Save DTTerastar/dd31529a13296b3b3ae19e7b547984fd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""iCloud account.""" | |
from __future__ import annotations | |
from datetime import timedelta | |
import logging | |
import operator | |
from typing import Any | |
from pyicloud import PyiCloudService | |
from pyicloud.exceptions import ( | |
PyiCloudFailedLoginException, | |
PyiCloudNoDevicesException, | |
PyiCloudServiceNotActivatedException, | |
) | |
from pyicloud.services.findmyiphone import AppleDevice | |
from homeassistant.components.zone import async_active_zone | |
from homeassistant.config_entries import SOURCE_REAUTH, ConfigEntry | |
from homeassistant.const import ATTR_ATTRIBUTION, CONF_USERNAME | |
from homeassistant.core import HomeAssistant | |
from homeassistant.exceptions import ConfigEntryNotReady | |
from homeassistant.helpers.dispatcher import dispatcher_send | |
from homeassistant.helpers.event import track_point_in_utc_time | |
from homeassistant.helpers.storage import Store | |
from homeassistant.util import slugify | |
from homeassistant.util.async_ import run_callback_threadsafe | |
from homeassistant.util.dt import utcnow | |
from homeassistant.util.location import distance | |
from .const import ( | |
DEVICE_BATTERY_LEVEL, | |
DEVICE_BATTERY_STATUS, | |
DEVICE_CLASS, | |
DEVICE_DISPLAY_NAME, | |
DEVICE_ID, | |
DEVICE_LOCATION, | |
DEVICE_LOCATION_HORIZONTAL_ACCURACY, | |
DEVICE_LOCATION_LATITUDE, | |
DEVICE_LOCATION_LONGITUDE, | |
DEVICE_LOST_MODE_CAPABLE, | |
DEVICE_LOW_POWER_MODE, | |
DEVICE_NAME, | |
DEVICE_PERSON_ID, | |
DEVICE_RAW_DEVICE_MODEL, | |
DEVICE_STATUS, | |
DEVICE_STATUS_CODES, | |
DEVICE_STATUS_SET, | |
DOMAIN, | |
) | |
ATTRIBUTION = "Data provided by Apple iCloud" | |
# entity attributes | |
ATTR_ACCOUNT_FETCH_INTERVAL = "account_fetch_interval" | |
ATTR_BATTERY = "battery" | |
ATTR_BATTERY_STATUS = "battery_status" | |
ATTR_DEVICE_NAME = "device_name" | |
ATTR_DEVICE_STATUS = "device_status" | |
ATTR_LOW_POWER_MODE = "low_power_mode" | |
ATTR_OWNER_NAME = "owner_fullname" | |
# services | |
SERVICE_ICLOUD_PLAY_SOUND = "play_sound" | |
SERVICE_ICLOUD_DISPLAY_MESSAGE = "display_message" | |
SERVICE_ICLOUD_LOST_DEVICE = "lost_device" | |
SERVICE_ICLOUD_UPDATE = "update" | |
ATTR_ACCOUNT = "account" | |
ATTR_LOST_DEVICE_MESSAGE = "message" | |
ATTR_LOST_DEVICE_NUMBER = "number" | |
ATTR_LOST_DEVICE_SOUND = "sound" | |
_LOGGER = logging.getLogger(__name__) | |
class IcloudAccount: | |
"""Representation of an iCloud account.""" | |
def __init__( | |
self, | |
hass: HomeAssistant, | |
username: str, | |
password: str, | |
icloud_dir: Store, | |
with_family: bool, | |
max_interval: int, | |
gps_accuracy_threshold: int, | |
config_entry: ConfigEntry, | |
) -> None: | |
"""Initialize an iCloud account.""" | |
self.hass = hass | |
self._username = username | |
self._password = password | |
self._with_family = with_family | |
self._fetch_interval: float = max_interval | |
self._max_interval = max_interval | |
self._gps_accuracy_threshold = gps_accuracy_threshold | |
self._icloud_dir = icloud_dir | |
self.api: PyiCloudService | None = None | |
self._owner_fullname: str | None = None | |
self._family_members_fullname: dict[str, str] = {} | |
self._devices: dict[str, IcloudDevice] = {} | |
self._retried_fetch = False | |
self._config_entry = config_entry | |
self.listeners = [] | |
def setup(self) -> None: | |
"""Set up an iCloud account.""" | |
try: | |
self.api = PyiCloudService( | |
self._username, | |
self._password, | |
self._icloud_dir.path, | |
with_family=self._with_family, | |
) | |
if self.api.requires_2fa: | |
# Trigger a new log in to ensure the user enters the 2FA code again. | |
raise PyiCloudFailedLoginException | |
except PyiCloudFailedLoginException: | |
self.api = None | |
# Login failed which means credentials need to be updated. | |
_LOGGER.error( | |
( | |
"Your password for '%s' is no longer working; Go to the " | |
"Integrations menu and click on Configure on the discovered Apple " | |
"iCloud card to login again" | |
), | |
self._config_entry.data[CONF_USERNAME], | |
) | |
self._require_reauth() | |
return | |
try: | |
api_devices = self.api.devices | |
# Gets device owners infos | |
user_info = api_devices.response["userInfo"] | |
except ( | |
PyiCloudServiceNotActivatedException, | |
PyiCloudNoDevicesException, | |
) as err: | |
_LOGGER.error("No iCloud device found") | |
raise ConfigEntryNotReady from err | |
self._owner_fullname = f"{user_info['firstName']} {user_info['lastName']}" | |
self._family_members_fullname = {} | |
if user_info.get("membersInfo") is not None: | |
for prs_id, member in user_info["membersInfo"].items(): | |
self._family_members_fullname[ | |
prs_id | |
] = f"{member['firstName']} {member['lastName']}" | |
self._devices = {} | |
self.update_devices() | |
def update_devices(self) -> None: | |
"""Update iCloud devices.""" | |
if self.api is None: | |
return | |
if self.api.requires_2fa: | |
self._require_reauth() | |
return | |
api_devices = {} | |
try: | |
api_devices = self.api.devices | |
except Exception as err: # pylint: disable=broad-except | |
_LOGGER.error("Unknown iCloud error: %s", err) | |
self._fetch_interval = 2 | |
dispatcher_send(self.hass, self.signal_device_update) | |
track_point_in_utc_time( | |
self.hass, | |
self.keep_alive, | |
utcnow() + timedelta(minutes=self._fetch_interval), | |
) | |
return | |
# Gets devices infos | |
new_device = False | |
for device in api_devices: | |
status = device.status(DEVICE_STATUS_SET) | |
device_id = status[DEVICE_ID] | |
device_name = status[DEVICE_NAME] | |
if ( | |
status[DEVICE_BATTERY_STATUS] == "Unknown" | |
or status.get(DEVICE_BATTERY_LEVEL) is None | |
): | |
continue | |
if self._devices.get(device_id) is not None: | |
# Seen device -> updating | |
_LOGGER.debug("Updating iCloud device: %s", device_name) | |
self._devices[device_id].update(status) | |
else: | |
# New device, should be unique | |
_LOGGER.debug( | |
"Adding iCloud device: %s [model: %s]", | |
device_name, | |
status[DEVICE_RAW_DEVICE_MODEL], | |
) | |
self._devices[device_id] = IcloudDevice(self, device, status) | |
self._devices[device_id].update(status) | |
new_device = True | |
if ( | |
DEVICE_STATUS_CODES.get(list(api_devices)[0][DEVICE_STATUS]) == "pending" | |
and not self._retried_fetch | |
): | |
_LOGGER.debug("Pending devices, trying again in 15s") | |
self._fetch_interval = 0.25 | |
self._retried_fetch = True | |
else: | |
self._fetch_interval = self._determine_interval() | |
self._retried_fetch = False | |
dispatcher_send(self.hass, self.signal_device_update) | |
if new_device: | |
dispatcher_send(self.hass, self.signal_device_new) | |
track_point_in_utc_time( | |
self.hass, | |
self.keep_alive, | |
utcnow() + timedelta(minutes=self._fetch_interval), | |
) | |
def _require_reauth(self): | |
"""Require the user to log in again.""" | |
self.hass.add_job( | |
self.hass.config_entries.flow.async_init( | |
DOMAIN, | |
context={"source": SOURCE_REAUTH}, | |
data={ | |
**self._config_entry.data, | |
"unique_id": self._config_entry.unique_id, | |
}, | |
) | |
) | |
def _determine_interval(self) -> int: | |
"""Calculate new interval between two API fetch (in minutes).""" | |
intervals = {"default": self._max_interval} | |
for device in self._devices.values(): | |
# Max interval if no location | |
if device.location is None: | |
continue | |
current_zone = run_callback_threadsafe( | |
self.hass.loop, | |
async_active_zone, | |
self.hass, | |
device.location[DEVICE_LOCATION_LATITUDE], | |
device.location[DEVICE_LOCATION_LONGITUDE], | |
device.location[DEVICE_LOCATION_HORIZONTAL_ACCURACY], | |
).result() | |
# Max interval if in zone | |
if current_zone is not None: | |
continue | |
zones = ( | |
self.hass.states.get(entity_id) | |
for entity_id in sorted(self.hass.states.entity_ids("zone")) | |
) | |
distances = [] | |
for zone_state in zones: | |
if zone_state is None: | |
continue | |
zone_state_lat = zone_state.attributes[DEVICE_LOCATION_LATITUDE] | |
zone_state_long = zone_state.attributes[DEVICE_LOCATION_LONGITUDE] | |
zone_distance = distance( | |
device.location[DEVICE_LOCATION_LATITUDE], | |
device.location[DEVICE_LOCATION_LONGITUDE], | |
zone_state_lat, | |
zone_state_long, | |
) | |
if zone_distance is not None: | |
distances.append(round(zone_distance / 1000, 1)) | |
# Max interval if no zone | |
if not distances: | |
continue | |
mindistance = min(distances) | |
# Calculate out how long it would take for the device to drive | |
# to the nearest zone at 120 km/h: | |
interval = round(mindistance / 2) | |
# Never poll more than once per minute | |
interval = max(interval, 1) | |
if interval > 180: | |
# Three hour drive? | |
# This is far enough that they might be flying | |
interval = self._max_interval | |
if ( | |
device.battery_level is not None | |
and device.battery_level <= 33 | |
and mindistance > 3 | |
): | |
# Low battery - let's check half as often | |
interval = interval * 2 | |
intervals[device.name] = interval | |
return max( | |
int(min(intervals.items(), key=operator.itemgetter(1))[1]), | |
self._max_interval, | |
) | |
def keep_alive(self, now=None) -> None: | |
"""Keep the API alive.""" | |
if self.api is None: | |
self.setup() | |
if self.api is None: | |
return | |
self.api.authenticate() | |
self.update_devices() | |
def get_devices_with_name(self, name: str) -> list[Any]: | |
"""Get devices by name.""" | |
result = [] | |
name_slug = slugify(name.replace(" ", "", 99)) | |
for device in self.devices.values(): | |
if slugify(device.name.replace(" ", "", 99)) == name_slug: | |
result.append(device) | |
if not result: | |
raise Exception(f"No device with name {name}") | |
return result | |
@property | |
def username(self) -> str: | |
"""Return the account username.""" | |
return self._username | |
@property | |
def owner_fullname(self) -> str | None: | |
"""Return the account owner fullname.""" | |
return self._owner_fullname | |
@property | |
def family_members_fullname(self) -> dict[str, str]: | |
"""Return the account family members fullname.""" | |
return self._family_members_fullname | |
@property | |
def fetch_interval(self) -> float: | |
"""Return the account fetch interval.""" | |
return self._fetch_interval | |
@property | |
def devices(self) -> dict[str, Any]: | |
"""Return the account devices.""" | |
return self._devices | |
@property | |
def signal_device_new(self) -> str: | |
"""Event specific per Freebox entry to signal new device.""" | |
return f"{DOMAIN}-{self._username}-device-new" | |
@property | |
def signal_device_update(self) -> str: | |
"""Event specific per Freebox entry to signal updates in devices.""" | |
return f"{DOMAIN}-{self._username}-device-update" | |
class IcloudDevice: | |
"""Representation of a iCloud device.""" | |
def __init__(self, account: IcloudAccount, device: AppleDevice, status) -> None: | |
"""Initialize the iCloud device.""" | |
self._account = account | |
self._device = device | |
self._status = status | |
self._name = self._status[DEVICE_NAME] | |
self._device_id = self._status[DEVICE_ID] | |
self._device_class = self._status[DEVICE_CLASS] | |
self._device_model = self._status[DEVICE_DISPLAY_NAME] | |
self._battery_level: int | None = None | |
self._battery_status = None | |
self._location = None | |
self._attrs = { | |
ATTR_ATTRIBUTION: ATTRIBUTION, | |
ATTR_ACCOUNT_FETCH_INTERVAL: self._account.fetch_interval, | |
ATTR_DEVICE_NAME: self._device_model, | |
ATTR_DEVICE_STATUS: None, | |
} | |
if self._status[DEVICE_PERSON_ID]: | |
self._attrs[ATTR_OWNER_NAME] = account.family_members_fullname[ | |
self._status[DEVICE_PERSON_ID] | |
] | |
elif account.owner_fullname is not None: | |
self._attrs[ATTR_OWNER_NAME] = account.owner_fullname | |
def update(self, status) -> None: | |
"""Update the iCloud device.""" | |
self._status = status | |
self._status[ATTR_ACCOUNT_FETCH_INTERVAL] = self._account.fetch_interval | |
device_status = DEVICE_STATUS_CODES.get(self._status[DEVICE_STATUS], "error") | |
self._attrs[ATTR_DEVICE_STATUS] = device_status | |
self._battery_status = self._status[DEVICE_BATTERY_STATUS] | |
self._attrs[ATTR_BATTERY_STATUS] = self._battery_status | |
device_battery_level = self._status.get(DEVICE_BATTERY_LEVEL, 0) | |
if self._battery_status != "Unknown" and device_battery_level is not None: | |
self._battery_level = int(device_battery_level * 100) | |
self._attrs[ATTR_BATTERY] = self._battery_level | |
self._attrs[ATTR_LOW_POWER_MODE] = self._status[DEVICE_LOW_POWER_MODE] | |
if ( | |
self._status[DEVICE_LOCATION] | |
and self._status[DEVICE_LOCATION][DEVICE_LOCATION_LATITUDE] | |
): | |
location = self._status[DEVICE_LOCATION] | |
if self._location is None: | |
dispatcher_send(self._account.hass, self._account.signal_device_new) | |
self._location = location | |
def play_sound(self) -> None: | |
"""Play sound on the device.""" | |
if self._account.api is None: | |
return | |
self._account.api.authenticate() | |
_LOGGER.debug("Playing sound for %s", self.name) | |
self.device.play_sound() | |
def display_message(self, message: str, sound: bool = False) -> None: | |
"""Display a message on the device.""" | |
if self._account.api is None: | |
return | |
self._account.api.authenticate() | |
_LOGGER.debug("Displaying message for %s", self.name) | |
self.device.display_message("Subject not working", message, sound) | |
def lost_device(self, number: str, message: str) -> None: | |
"""Make the device in lost state.""" | |
if self._account.api is None: | |
return | |
self._account.api.authenticate() | |
if self._status[DEVICE_LOST_MODE_CAPABLE]: | |
_LOGGER.debug("Make device lost for %s", self.name) | |
self.device.lost_device(number, message, None) | |
else: | |
_LOGGER.error("Cannot make device lost for %s", self.name) | |
@property | |
def unique_id(self) -> str: | |
"""Return a unique ID.""" | |
return self._device_id | |
@property | |
def name(self) -> str: | |
"""Return the Apple device name.""" | |
return self._name | |
@property | |
def device(self) -> AppleDevice: | |
"""Return the Apple device.""" | |
return self._device | |
@property | |
def device_class(self) -> str: | |
"""Return the Apple device class.""" | |
return self._device_class | |
@property | |
def device_model(self) -> str: | |
"""Return the Apple device model.""" | |
return self._device_model | |
@property | |
def battery_level(self) -> int | None: | |
"""Return the Apple device battery level.""" | |
return self._battery_level | |
@property | |
def battery_status(self) -> str | None: | |
"""Return the Apple device battery status.""" | |
return self._battery_status | |
@property | |
def location(self) -> dict[str, Any] | None: | |
"""Return the Apple device location.""" | |
return self._location | |
@property | |
def extra_state_attributes(self) -> dict[str, Any]: | |
"""Return the attributes.""" | |
return self._attrs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment