Skip to content

Instantly share code, notes, and snippets.

@nitobuendia
Last active November 19, 2021 16:32
Show Gist options
  • Save nitobuendia/e62b5f5db8d94d54346227c756fc0368 to your computer and use it in GitHub Desktop.
Save nitobuendia/e62b5f5db8d94d54346227c756fc0368 to your computer and use it in GitHub Desktop.
Oura API Ring Sleep sensor for Home Assistant (WIP)
"""Oura sleep integration."""
{
"domain": "oura",
"name": "Oura Ring",
"documentation": "https://cloud.ouraring.com/docs/",
"dependencies": [],
"codeowners": [],
"requirements": []
}
- platform: oura
name: sleep_quality
client_id: !secret oura_client_id
client_secret: !secret oura_client_secret
scan_interval: 7200 # 2h = 2h * 60min * 60 seconds
backfill: True
monitored_variables:
- yesterday
- monday
- tuesday
- wednesday
- thursday
- friday
- saturday
- sunday
- 8d_ago # Last week, +1 to compare to yesterday.
"""Sleep sensor from Oura Ring data."""
import datetime
from dateutil import parser
import enum
from homeassistant import const
from homeassistant.helpers import config_validation
from homeassistant.helpers import entity
import json
import logging
import os
import re
import requests
import voluptuous
# Constants.
_FULL_WEEKDAY_NAMES = [
'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday',
'sunday',
]
# Sensor config.
SENSOR = 'oura'
_TOKEN_FILE = 'oura-token-cache'
_CONF_CLIENT_ID = 'client_id'
_CONF_CLIENT_SECRET = 'client_secret'
_CONF_BACKFILL = 'backfill'
_MAX_BACKFILL = 3
# Oura config.
_OURA_API = 'https://api.ouraring.com/v1'
_OURA_CLOUD = 'https://cloud.ouraring.com'
_MAX_API_RETRIES = 3
class OuraEndpoints(enum.Enum):
"""Represents Oura endpoints."""
# Data endpoints.
ACTIVITY = '{}/activity'.format(_OURA_API)
READINESS = '{}/readiness'.format(_OURA_API)
SLEEP = '{}/sleep'.format(_OURA_API)
USER_INFO = '{}/userinfo'.format(_OURA_API)
# Auth endpoints.
AUTHORIZE = '{}/oauth/authorize'.format(_OURA_CLOUD)
TOKEN = '{}/oauth/token'.format(_OURA_CLOUD)
# Default attributes.
_DEFAULT_NAME = 'sleep_score'
_DEFAULT_MONITORED_VARIABLES = ['yesterday']
PLATFORM_SCHEMA = config_validation.PLATFORM_SCHEMA.extend({
voluptuous.Required(_CONF_CLIENT_ID): config_validation.string,
voluptuous.Required(_CONF_CLIENT_SECRET): config_validation.string,
voluptuous.Optional(
const.CONF_MONITORED_VARIABLES,
default=_DEFAULT_MONITORED_VARIABLES): config_validation.ensure_list,
voluptuous.Optional(
const.CONF_NAME, default=_DEFAULT_NAME): config_validation.string,
voluptuous.Optional(
_CONF_BACKFILL, default=False): config_validation.boolean,
})
class MonitoredDayType(enum.Enum):
"""Types of days which can be monitored."""
UNKNOWN = 0
YESTERDAY = 1
WEEKDAY = 2
DAYS_AGO = 3
_EMPTY_SENSOR_ATTRIBUTE = {
'date': None,
'bedtime_start_hour': None,
'bedtime_end_hour': None,
'breath_average': None,
'temperature_delta': None,
'resting_heart_rate': None,
'heart_rate_average': None,
'deep_sleep_duration': None,
'rem_sleep_duration': None,
'light_sleep_duration': None,
'total_sleep_duration': None,
'awake_duration': None,
'in_bed_duration': None,
}
async def setup(hass, config):
"""No set up required once token data is obtained."""
return True
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Adds sensor platform to the list of platforms."""
setup(hass, config)
add_devices([OuraSleepSensor(config)], True)
def _seconds_to_hours(time_in_seconds):
"""Parses times in seconds and converts it to hours."""
return round(int(time_in_seconds) / (60 * 60), 2)
def _add_days_to_string_date(string_date, days_to_add):
"""Adds (or subtracts) days from a string date.
Args:
string_date: Original date in YYYY-MM-DD.
days_to_add: Number of days to add. Negative to subtract.
Returns:
Date in YYYY-MM-DD with days added.
"""
date = datetime.datetime.strptime(string_date, '%Y-%m-%d')
new_date = date + datetime.timedelta(days=days_to_add)
return str(new_date.date())
class OuraSleepSensor(entity.Entity):
"""Representation of a Oura Ring Sensor sensor."""
def __init__(self, config):
"""Initialize the sensor."""
# Metadata.
self._name = config.get(const.CONF_NAME)
self._backfill = config.get(_CONF_BACKFILL)
self._api = OuraApi(
config.get(_CONF_CLIENT_ID),
config.get(_CONF_CLIENT_SECRET))
self._monitored_days = [
date_name.lower()
for date_name in config.get(const.CONF_MONITORED_VARIABLES)
]
# Attributes.
self._state = None # Sleep score.
self._attributes = {}
# Oura update logic.
def _get_date_type_by_name(self, date_name):
"""Gets the type of date format based in the date name.
Args:
date_name: Date for which to verify type.
Returns:
Date type (MonitoredDayType).
"""
if date_name == 'yesterday':
return MonitoredDayType.YESTERDAY
elif date_name in _FULL_WEEKDAY_NAMES:
return MonitoredDayType.WEEKDAY
elif 'd_ago' in date_name or 'days_ago' in date_name:
return MonitoredDayType.DAYS_AGO
else:
return MonitoredDayType.UNKNOWN
def _get_date_by_name(self, date_name):
"""Translate a date name into YYYY-MM-DD format for the given day.
Args:
date_name: Name of the date to get. Supported:
yesterday, weekday (e.g. monday, wednesday), Xdays_ago (e.g. 3days_ago).
Returns:
Date in YYYY-MM-DD format.
"""
date_type = self._get_date_type_by_name(date_name)
today = datetime.date.today()
days_ago = None
if date_type == MonitoredDayType.YESTERDAY:
days_ago = 1
elif date_type == MonitoredDayType.WEEKDAY:
date_index = _FULL_WEEKDAY_NAMES.index(date_name)
days_ago = (
today.weekday() - date_index
if today.weekday() > date_index else
7 + today.weekday() - date_index
)
elif date_type == MonitoredDayType.DAYS_AGO:
digits_regex = re.compile(r'\d+')
digits_match = digits_regex.match(date_name)
if digits_match:
try:
days_ago = int(digits_match.group())
except:
days_ago = None
if days_ago is None:
logging.error(f'Oura: Unknown day name `{date_name}`, using yesterday.')
days_ago = 1
return str(today - datetime.timedelta(days=days_ago))
def _get_backfill_date(self, date_name, date_value):
"""Gets the backfill date for a given date and date name.
Args:
date_name: Date name to backfill.
date_value: Last checked value.
Returns:
Potential backfill date. None if Unknown.
"""
date_type = self._get_date_type_by_name(date_name)
if date_type == MonitoredDayType.YESTERDAY:
return _add_days_to_string_date(date_value, -1)
elif date_type == MonitoredDayType.WEEKDAY:
return _add_days_to_string_date(date_value, -7)
elif date_type == MonitoredDayType.DAYS_AGO:
return _add_days_to_string_date(date_value, -1)
else:
return None
def _parse_sleep_data(self, oura_data):
"""Processes sleep data into a dictionary.
Args:
oura_data: Sleep data in list format from Oura API.
Returns:
Dictionary where key is the requested summary_date and value is the
Oura sleep data for that given day.
"""
if not oura_data or 'sleep' not in oura_data:
logging.error('Couldn\'t fetch data for Oura ring sensor.')
return {}
sleep_data = oura_data.get('sleep')
if not sleep_data:
return {}
sleep_dict = {}
for sleep_daily_data in sleep_data:
sleep_date = sleep_daily_data.get('summary_date')
if not sleep_date:
continue
sleep_dict[sleep_date] = sleep_daily_data
return sleep_dict
def update(self):
"""Fetches new state data for the sensor."""
sleep_dates = {
date_name: self._get_date_by_name(date_name)
for date_name in self._monitored_days
}
# Add an extra week to retrieve past week in case current week data is
# missing.
start_date = _add_days_to_string_date(min(sleep_dates.values()), -7)
end_date = max(sleep_dates.values())
oura_data = self._api.get_sleep_data(start_date, end_date)
sleep_data = self._parse_sleep_data(oura_data)
if not sleep_data:
return
for date_name, date_value in sleep_dates.items():
if not date_name in self._attributes:
self._attributes[date_name] = dict(_EMPTY_SENSOR_ATTRIBUTE)
self._attributes[date_name]['date'] = date_value
sleep = sleep_data.get(date_value)
# Check past dates to see if backfill is possible when missing data.
backfill = 0
while (
not sleep
and self._backfill
and date_value >= start_date
and backfill < _MAX_BACKFILL):
last_date_value = date_value
date_value = self._get_backfill_date(date_name, date_value)
if not date_value:
break
logging.error(
f'Unable to read Oura data for {date_name} ({last_date_value}). '
f'Fetching {date_value} instead.')
sleep = sleep_data.get(date_value)
backfill += 1
if not sleep:
logging.error(f'Unable to read Oura data for {date_name}.')
continue
# The state gets the value of the sleep score for the first monitored day.
if self._monitored_days.index(date_name) == 0:
self._state = sleep.get('score')
bedtime_start = parser.parse(sleep.get('bedtime_start'))
bedtime_end = parser.parse(sleep.get('bedtime_end'))
self._attributes[date_name] = {
'date': date_value,
# HH:MM at which you went bed.
'bedtime_start_hour': bedtime_start.strftime('%H:%M'),
# HH:MM at which you woke up.
'bedtime_end_hour': bedtime_end.strftime('%H:%M'),
# Breaths / minute.
'breath_average': int(round(sleep.get('breath_average'), 0)),
# Temperature deviation in Celsius.
'temperature_delta': sleep.get('temperature_delta'),
# Beats / minute (lowest).
'resting_heart_rate': sleep.get('hr_lowest'),
# Avg. beats / minute.
'heart_rate_average': int(round((
sum(sleep.get('hr_5min', 0))
/ (len(sleep.get('hr_5min', [])) or 1)),
0)),
# Hours in deep sleep.
'deep_sleep_duration': _seconds_to_hours(sleep.get('deep')),
# Hours in REM sleep.
'rem_sleep_duration': _seconds_to_hours(sleep.get('rem')),
# Hours in light sleep.
'light_sleep_duration': _seconds_to_hours(sleep.get('light')),
# Hours sleeping: deep + rem + light.
'total_sleep_duration': _seconds_to_hours(sleep.get('total')),
# Hours awake.
'awake_duration': _seconds_to_hours(sleep.get('awake')),
# Hours in bed: sleep + awake.
'in_bed_duration': _seconds_to_hours(sleep.get('duration')),
}
# Hass.io properties.
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def device_state_attributes(self):
"""Return the sensor attributes."""
return self._attributes
class OuraApi(object):
"""Handles Oura API interactions."""
def __init__(self, client_id, client_secret):
"""Instantiates a new OuraApi class.
Obtains data from access and refresh token.
"""
self._client_id = client_id
self._client_secret = client_secret
self._access_token = None
self._refresh_token = None
def get_sleep_data(self, start_date, end_date=None):
"""Fetches data for a sleep OuraEndpoint and date.
Args:
start_date: Day for which to fetch data (YYYY-MM-DD).
end_date: Last day for which to retrieve data (YYYY-MM-DD).
If same as start_date, leave empty.
Returns:
Dictionary containing Oura sleep data.
"""
if not self._access_token:
self._get_access_token_data_from_file()
retries = 0
while retries < _MAX_API_RETRIES:
api_url = self._get_api_endpoint(OuraEndpoints.SLEEP,
start_date=start_date)
response = requests.get(api_url)
response_data = response.json()
if not response_data:
retries += 1
continue
if response_data.get('message') == 'Unauthorized':
retries += 1
self._refresh_access_token()
continue
return response_data
logging.error(
'Couldn\'t fetch data for Oura ring sensor. Verify API token.')
def _get_api_endpoint(self, api_endpoint, **kwargs):
"""Gets URL for a given endpoint and day.
Args:
api_endpoint: Endpoint (OuraEndpoints) from which to fetch data.
Optional Keyword Args:
start_date: Day for which to fetch data (YYYY-MM-DD).
> Required for SLEEP endpoint.
end_date: Day to which to fetch data (YYYY-MM-DD).
> Optional, used for SLEEP endpoint.
Returns:
Full Oura API endpoint URL.
"""
if api_endpoint == OuraEndpoints.SLEEP:
sleep_api_url ='{base_url}?access_token={access_token}'.format(
base_url=api_endpoint.value,
access_token=self._access_token,
)
if kwargs.get('start_date'):
sleep_api_url = sleep_api_url + '&start={}'.format(kwargs['start_date'])
if kwargs.get('end_date'):
sleep_api_url = sleep_api_url + '&end={}'.format(kwargs['end_date'])
return sleep_api_url
elif api_endpoint == OuraEndpoints.TOKEN:
return api_endpoint.value
else:
return '{base_url}?access_token={access_token}'.format(
base_url=api_endpoint.value,
access_token=self._access_token,
)
def _get_access_token_data_from_file(self):
"""Gets credentials data from the credentials file."""
if not os.path.isfile(_TOKEN_FILE):
# TODO: add support for more than one sensor based in sensor name.
# TODO: add workflow to get first access and refresh token.
logging.error('Unable to find Oura credentials file.')
return
with open(_TOKEN_FILE, 'r') as token_file:
token_data = json.loads(token_file.read()) or {}
self._access_token = token_data.get('access_token')
self._refresh_token = token_data.get('refresh_token')
def _store_access_token_data_to_file(self, access_token_data):
"""Stores credentials data to the credentials file.
Args:
access_token_data: Credentials data in a dictionary.
Should contain at least access_token and refresh_token.
"""
if not access_token_data or 'access_token' not in access_token_data:
logging.error('Oura Access token not provided.')
return
if 'refresh_token' not in access_token_data:
logging.error('Oura Refresh token not provided.')
return
with open(_TOKEN_FILE, 'w+') as token_file:
token_file.write(json.dumps(access_token_data))
def _refresh_access_token(self):
"""Gets a new access token using refresh token."""
access_token_data = self._request_access_token_with_refresh_token()
if 'access_token' not in access_token_data:
logging.error('Oura API was unable to retrieve new API token.')
return
if 'refresh_token' not in access_token_data:
access_token_data['refresh_token'] = self._refresh_token
self._store_access_token_data_to_file(access_token_data)
self._access_token = access_token_data['access_token']
self._refresh_token = access_token_data['refresh_token']
def _request_access_token_with_refresh_token(self):
"""Sends a request to get access token with the existing refresh token.
Returns:
Response from requesting new token.
Most likely: Access and refresh token data in a dictionary.
"""
request_auth = requests.auth.HTTPBasicAuth(self._client_id,
self._client_secret)
request_data = {
'grant_type': 'refresh_token',
'refresh_token': self._refresh_token,
}
request_url = self._get_api_endpoint(OuraEndpoints.TOKEN)
response = requests.post(request_url, auth=request_auth, data=request_data)
return response.json()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment