Last active
November 19, 2021 16:32
-
-
Save nitobuendia/e62b5f5db8d94d54346227c756fc0368 to your computer and use it in GitHub Desktop.
Oura API Ring Sleep sensor for Home Assistant (WIP)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Oura sleep integration.""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"domain": "oura", | |
"name": "Oura Ring", | |
"documentation": "https://cloud.ouraring.com/docs/", | |
"dependencies": [], | |
"codeowners": [], | |
"requirements": [] | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- platform: oura | |
name: sleep_quality | |
client_id: !secret oura_client_id | |
client_secret: !secret oura_client_secret | |
scan_interval: 7200 # 2h = 2h * 60min * 60 seconds | |
backfill: True | |
monitored_variables: | |
- yesterday | |
- monday | |
- tuesday | |
- wednesday | |
- thursday | |
- friday | |
- saturday | |
- sunday | |
- 8d_ago # Last week, +1 to compare to yesterday. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Sleep sensor from Oura Ring data.""" | |
import datetime | |
from dateutil import parser | |
import enum | |
from homeassistant import const | |
from homeassistant.helpers import config_validation | |
from homeassistant.helpers import entity | |
import json | |
import logging | |
import os | |
import re | |
import requests | |
import voluptuous | |
# Constants. | |
_FULL_WEEKDAY_NAMES = [ | |
'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', | |
'sunday', | |
] | |
# Sensor config. | |
SENSOR = 'oura' | |
_TOKEN_FILE = 'oura-token-cache' | |
_CONF_CLIENT_ID = 'client_id' | |
_CONF_CLIENT_SECRET = 'client_secret' | |
_CONF_BACKFILL = 'backfill' | |
_MAX_BACKFILL = 3 | |
# Oura config. | |
_OURA_API = 'https://api.ouraring.com/v1' | |
_OURA_CLOUD = 'https://cloud.ouraring.com' | |
_MAX_API_RETRIES = 3 | |
class OuraEndpoints(enum.Enum): | |
"""Represents Oura endpoints.""" | |
# Data endpoints. | |
ACTIVITY = '{}/activity'.format(_OURA_API) | |
READINESS = '{}/readiness'.format(_OURA_API) | |
SLEEP = '{}/sleep'.format(_OURA_API) | |
USER_INFO = '{}/userinfo'.format(_OURA_API) | |
# Auth endpoints. | |
AUTHORIZE = '{}/oauth/authorize'.format(_OURA_CLOUD) | |
TOKEN = '{}/oauth/token'.format(_OURA_CLOUD) | |
# Default attributes. | |
_DEFAULT_NAME = 'sleep_score' | |
_DEFAULT_MONITORED_VARIABLES = ['yesterday'] | |
PLATFORM_SCHEMA = config_validation.PLATFORM_SCHEMA.extend({ | |
voluptuous.Required(_CONF_CLIENT_ID): config_validation.string, | |
voluptuous.Required(_CONF_CLIENT_SECRET): config_validation.string, | |
voluptuous.Optional( | |
const.CONF_MONITORED_VARIABLES, | |
default=_DEFAULT_MONITORED_VARIABLES): config_validation.ensure_list, | |
voluptuous.Optional( | |
const.CONF_NAME, default=_DEFAULT_NAME): config_validation.string, | |
voluptuous.Optional( | |
_CONF_BACKFILL, default=False): config_validation.boolean, | |
}) | |
class MonitoredDayType(enum.Enum): | |
"""Types of days which can be monitored.""" | |
UNKNOWN = 0 | |
YESTERDAY = 1 | |
WEEKDAY = 2 | |
DAYS_AGO = 3 | |
_EMPTY_SENSOR_ATTRIBUTE = { | |
'date': None, | |
'bedtime_start_hour': None, | |
'bedtime_end_hour': None, | |
'breath_average': None, | |
'temperature_delta': None, | |
'resting_heart_rate': None, | |
'heart_rate_average': None, | |
'deep_sleep_duration': None, | |
'rem_sleep_duration': None, | |
'light_sleep_duration': None, | |
'total_sleep_duration': None, | |
'awake_duration': None, | |
'in_bed_duration': None, | |
} | |
async def setup(hass, config): | |
"""No set up required once token data is obtained.""" | |
return True | |
def setup_platform(hass, config, add_devices, discovery_info=None): | |
"""Adds sensor platform to the list of platforms.""" | |
setup(hass, config) | |
add_devices([OuraSleepSensor(config)], True) | |
def _seconds_to_hours(time_in_seconds): | |
"""Parses times in seconds and converts it to hours.""" | |
return round(int(time_in_seconds) / (60 * 60), 2) | |
def _add_days_to_string_date(string_date, days_to_add): | |
"""Adds (or subtracts) days from a string date. | |
Args: | |
string_date: Original date in YYYY-MM-DD. | |
days_to_add: Number of days to add. Negative to subtract. | |
Returns: | |
Date in YYYY-MM-DD with days added. | |
""" | |
date = datetime.datetime.strptime(string_date, '%Y-%m-%d') | |
new_date = date + datetime.timedelta(days=days_to_add) | |
return str(new_date.date()) | |
class OuraSleepSensor(entity.Entity): | |
"""Representation of a Oura Ring Sensor sensor.""" | |
def __init__(self, config): | |
"""Initialize the sensor.""" | |
# Metadata. | |
self._name = config.get(const.CONF_NAME) | |
self._backfill = config.get(_CONF_BACKFILL) | |
self._api = OuraApi( | |
config.get(_CONF_CLIENT_ID), | |
config.get(_CONF_CLIENT_SECRET)) | |
self._monitored_days = [ | |
date_name.lower() | |
for date_name in config.get(const.CONF_MONITORED_VARIABLES) | |
] | |
# Attributes. | |
self._state = None # Sleep score. | |
self._attributes = {} | |
# Oura update logic. | |
def _get_date_type_by_name(self, date_name): | |
"""Gets the type of date format based in the date name. | |
Args: | |
date_name: Date for which to verify type. | |
Returns: | |
Date type (MonitoredDayType). | |
""" | |
if date_name == 'yesterday': | |
return MonitoredDayType.YESTERDAY | |
elif date_name in _FULL_WEEKDAY_NAMES: | |
return MonitoredDayType.WEEKDAY | |
elif 'd_ago' in date_name or 'days_ago' in date_name: | |
return MonitoredDayType.DAYS_AGO | |
else: | |
return MonitoredDayType.UNKNOWN | |
def _get_date_by_name(self, date_name): | |
"""Translate a date name into YYYY-MM-DD format for the given day. | |
Args: | |
date_name: Name of the date to get. Supported: | |
yesterday, weekday (e.g. monday, wednesday), Xdays_ago (e.g. 3days_ago). | |
Returns: | |
Date in YYYY-MM-DD format. | |
""" | |
date_type = self._get_date_type_by_name(date_name) | |
today = datetime.date.today() | |
days_ago = None | |
if date_type == MonitoredDayType.YESTERDAY: | |
days_ago = 1 | |
elif date_type == MonitoredDayType.WEEKDAY: | |
date_index = _FULL_WEEKDAY_NAMES.index(date_name) | |
days_ago = ( | |
today.weekday() - date_index | |
if today.weekday() > date_index else | |
7 + today.weekday() - date_index | |
) | |
elif date_type == MonitoredDayType.DAYS_AGO: | |
digits_regex = re.compile(r'\d+') | |
digits_match = digits_regex.match(date_name) | |
if digits_match: | |
try: | |
days_ago = int(digits_match.group()) | |
except: | |
days_ago = None | |
if days_ago is None: | |
logging.error(f'Oura: Unknown day name `{date_name}`, using yesterday.') | |
days_ago = 1 | |
return str(today - datetime.timedelta(days=days_ago)) | |
def _get_backfill_date(self, date_name, date_value): | |
"""Gets the backfill date for a given date and date name. | |
Args: | |
date_name: Date name to backfill. | |
date_value: Last checked value. | |
Returns: | |
Potential backfill date. None if Unknown. | |
""" | |
date_type = self._get_date_type_by_name(date_name) | |
if date_type == MonitoredDayType.YESTERDAY: | |
return _add_days_to_string_date(date_value, -1) | |
elif date_type == MonitoredDayType.WEEKDAY: | |
return _add_days_to_string_date(date_value, -7) | |
elif date_type == MonitoredDayType.DAYS_AGO: | |
return _add_days_to_string_date(date_value, -1) | |
else: | |
return None | |
def _parse_sleep_data(self, oura_data): | |
"""Processes sleep data into a dictionary. | |
Args: | |
oura_data: Sleep data in list format from Oura API. | |
Returns: | |
Dictionary where key is the requested summary_date and value is the | |
Oura sleep data for that given day. | |
""" | |
if not oura_data or 'sleep' not in oura_data: | |
logging.error('Couldn\'t fetch data for Oura ring sensor.') | |
return {} | |
sleep_data = oura_data.get('sleep') | |
if not sleep_data: | |
return {} | |
sleep_dict = {} | |
for sleep_daily_data in sleep_data: | |
sleep_date = sleep_daily_data.get('summary_date') | |
if not sleep_date: | |
continue | |
sleep_dict[sleep_date] = sleep_daily_data | |
return sleep_dict | |
def update(self): | |
"""Fetches new state data for the sensor.""" | |
sleep_dates = { | |
date_name: self._get_date_by_name(date_name) | |
for date_name in self._monitored_days | |
} | |
# Add an extra week to retrieve past week in case current week data is | |
# missing. | |
start_date = _add_days_to_string_date(min(sleep_dates.values()), -7) | |
end_date = max(sleep_dates.values()) | |
oura_data = self._api.get_sleep_data(start_date, end_date) | |
sleep_data = self._parse_sleep_data(oura_data) | |
if not sleep_data: | |
return | |
for date_name, date_value in sleep_dates.items(): | |
if not date_name in self._attributes: | |
self._attributes[date_name] = dict(_EMPTY_SENSOR_ATTRIBUTE) | |
self._attributes[date_name]['date'] = date_value | |
sleep = sleep_data.get(date_value) | |
# Check past dates to see if backfill is possible when missing data. | |
backfill = 0 | |
while ( | |
not sleep | |
and self._backfill | |
and date_value >= start_date | |
and backfill < _MAX_BACKFILL): | |
last_date_value = date_value | |
date_value = self._get_backfill_date(date_name, date_value) | |
if not date_value: | |
break | |
logging.error( | |
f'Unable to read Oura data for {date_name} ({last_date_value}). ' | |
f'Fetching {date_value} instead.') | |
sleep = sleep_data.get(date_value) | |
backfill += 1 | |
if not sleep: | |
logging.error(f'Unable to read Oura data for {date_name}.') | |
continue | |
# The state gets the value of the sleep score for the first monitored day. | |
if self._monitored_days.index(date_name) == 0: | |
self._state = sleep.get('score') | |
bedtime_start = parser.parse(sleep.get('bedtime_start')) | |
bedtime_end = parser.parse(sleep.get('bedtime_end')) | |
self._attributes[date_name] = { | |
'date': date_value, | |
# HH:MM at which you went bed. | |
'bedtime_start_hour': bedtime_start.strftime('%H:%M'), | |
# HH:MM at which you woke up. | |
'bedtime_end_hour': bedtime_end.strftime('%H:%M'), | |
# Breaths / minute. | |
'breath_average': int(round(sleep.get('breath_average'), 0)), | |
# Temperature deviation in Celsius. | |
'temperature_delta': sleep.get('temperature_delta'), | |
# Beats / minute (lowest). | |
'resting_heart_rate': sleep.get('hr_lowest'), | |
# Avg. beats / minute. | |
'heart_rate_average': int(round(( | |
sum(sleep.get('hr_5min', 0)) | |
/ (len(sleep.get('hr_5min', [])) or 1)), | |
0)), | |
# Hours in deep sleep. | |
'deep_sleep_duration': _seconds_to_hours(sleep.get('deep')), | |
# Hours in REM sleep. | |
'rem_sleep_duration': _seconds_to_hours(sleep.get('rem')), | |
# Hours in light sleep. | |
'light_sleep_duration': _seconds_to_hours(sleep.get('light')), | |
# Hours sleeping: deep + rem + light. | |
'total_sleep_duration': _seconds_to_hours(sleep.get('total')), | |
# Hours awake. | |
'awake_duration': _seconds_to_hours(sleep.get('awake')), | |
# Hours in bed: sleep + awake. | |
'in_bed_duration': _seconds_to_hours(sleep.get('duration')), | |
} | |
# Hass.io properties. | |
@property | |
def name(self): | |
"""Return the name of the sensor.""" | |
return self._name | |
@property | |
def state(self): | |
"""Return the state of the sensor.""" | |
return self._state | |
@property | |
def device_state_attributes(self): | |
"""Return the sensor attributes.""" | |
return self._attributes | |
class OuraApi(object): | |
"""Handles Oura API interactions.""" | |
def __init__(self, client_id, client_secret): | |
"""Instantiates a new OuraApi class. | |
Obtains data from access and refresh token. | |
""" | |
self._client_id = client_id | |
self._client_secret = client_secret | |
self._access_token = None | |
self._refresh_token = None | |
def get_sleep_data(self, start_date, end_date=None): | |
"""Fetches data for a sleep OuraEndpoint and date. | |
Args: | |
start_date: Day for which to fetch data (YYYY-MM-DD). | |
end_date: Last day for which to retrieve data (YYYY-MM-DD). | |
If same as start_date, leave empty. | |
Returns: | |
Dictionary containing Oura sleep data. | |
""" | |
if not self._access_token: | |
self._get_access_token_data_from_file() | |
retries = 0 | |
while retries < _MAX_API_RETRIES: | |
api_url = self._get_api_endpoint(OuraEndpoints.SLEEP, | |
start_date=start_date) | |
response = requests.get(api_url) | |
response_data = response.json() | |
if not response_data: | |
retries += 1 | |
continue | |
if response_data.get('message') == 'Unauthorized': | |
retries += 1 | |
self._refresh_access_token() | |
continue | |
return response_data | |
logging.error( | |
'Couldn\'t fetch data for Oura ring sensor. Verify API token.') | |
def _get_api_endpoint(self, api_endpoint, **kwargs): | |
"""Gets URL for a given endpoint and day. | |
Args: | |
api_endpoint: Endpoint (OuraEndpoints) from which to fetch data. | |
Optional Keyword Args: | |
start_date: Day for which to fetch data (YYYY-MM-DD). | |
> Required for SLEEP endpoint. | |
end_date: Day to which to fetch data (YYYY-MM-DD). | |
> Optional, used for SLEEP endpoint. | |
Returns: | |
Full Oura API endpoint URL. | |
""" | |
if api_endpoint == OuraEndpoints.SLEEP: | |
sleep_api_url ='{base_url}?access_token={access_token}'.format( | |
base_url=api_endpoint.value, | |
access_token=self._access_token, | |
) | |
if kwargs.get('start_date'): | |
sleep_api_url = sleep_api_url + '&start={}'.format(kwargs['start_date']) | |
if kwargs.get('end_date'): | |
sleep_api_url = sleep_api_url + '&end={}'.format(kwargs['end_date']) | |
return sleep_api_url | |
elif api_endpoint == OuraEndpoints.TOKEN: | |
return api_endpoint.value | |
else: | |
return '{base_url}?access_token={access_token}'.format( | |
base_url=api_endpoint.value, | |
access_token=self._access_token, | |
) | |
def _get_access_token_data_from_file(self): | |
"""Gets credentials data from the credentials file.""" | |
if not os.path.isfile(_TOKEN_FILE): | |
# TODO: add support for more than one sensor based in sensor name. | |
# TODO: add workflow to get first access and refresh token. | |
logging.error('Unable to find Oura credentials file.') | |
return | |
with open(_TOKEN_FILE, 'r') as token_file: | |
token_data = json.loads(token_file.read()) or {} | |
self._access_token = token_data.get('access_token') | |
self._refresh_token = token_data.get('refresh_token') | |
def _store_access_token_data_to_file(self, access_token_data): | |
"""Stores credentials data to the credentials file. | |
Args: | |
access_token_data: Credentials data in a dictionary. | |
Should contain at least access_token and refresh_token. | |
""" | |
if not access_token_data or 'access_token' not in access_token_data: | |
logging.error('Oura Access token not provided.') | |
return | |
if 'refresh_token' not in access_token_data: | |
logging.error('Oura Refresh token not provided.') | |
return | |
with open(_TOKEN_FILE, 'w+') as token_file: | |
token_file.write(json.dumps(access_token_data)) | |
def _refresh_access_token(self): | |
"""Gets a new access token using refresh token.""" | |
access_token_data = self._request_access_token_with_refresh_token() | |
if 'access_token' not in access_token_data: | |
logging.error('Oura API was unable to retrieve new API token.') | |
return | |
if 'refresh_token' not in access_token_data: | |
access_token_data['refresh_token'] = self._refresh_token | |
self._store_access_token_data_to_file(access_token_data) | |
self._access_token = access_token_data['access_token'] | |
self._refresh_token = access_token_data['refresh_token'] | |
def _request_access_token_with_refresh_token(self): | |
"""Sends a request to get access token with the existing refresh token. | |
Returns: | |
Response from requesting new token. | |
Most likely: Access and refresh token data in a dictionary. | |
""" | |
request_auth = requests.auth.HTTPBasicAuth(self._client_id, | |
self._client_secret) | |
request_data = { | |
'grant_type': 'refresh_token', | |
'refresh_token': self._refresh_token, | |
} | |
request_url = self._get_api_endpoint(OuraEndpoints.TOKEN) | |
response = requests.post(request_url, auth=request_auth, data=request_data) | |
return response.json() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment