Skip to content

Instantly share code, notes, and snippets.

@julian-klode
Last active May 27, 2021 07:45
Show Gist options
  • Save julian-klode/39417bd1dc736dcfa05964497a608d6e to your computer and use it in GitHub Desktop.
Save julian-klode/39417bd1dc736dcfa05964497a608d6e to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
#
# Copyright (C) 2020-2021 Julian Andres Klode <[email protected]>
#
# This Source Code Form is subject to the terms of the Mozilla
# Public License, v. 2.0. If a copy of the MPL was not distributed
# with this file, You can obtain one at
# https://mozilla.org/MPL/2.0/.
#
"""HR plotting script.
Usage: plot-hr.py [--png]
Plot HR of oura ring vs elite hrv export.
"""
from typing import (
Dict,
Iterator,
List,
Tuple,
NewType,
Union,
Optional,
)
import datetime
import matplotlib.pyplot as plt # type: ignore
import os
import sys
import math
import itertools
import numpy # type: ignore
from hrvanalysis import ( # type: ignore
get_time_domain_features,
remove_outliers,
interpolate_nan_values,
remove_ectopic_beats,
)
import json
# Type declarations
Time = datetime.datetime
Interval = NewType("Interval", int)
HR = NewType("HR", float)
RMSSD = NewType("RMSSD", float)
NaN = float("NaN")
# Time zone
TZ = datetime.datetime.now().astimezone().tzinfo
oura_start = datetime.datetime(2021, 1, 1, tzinfo=TZ)
def find_window(time: Time) -> Optional[Time]:
"""Todo: Speed me up"""
if time < oura_start:
return None
distance = time.timestamp() - oura_start.timestamp()
group = distance // 300 * 300
window = datetime.datetime.fromtimestamp(
group + oura_start.timestamp(), tz=time.tzinfo
)
return window
def windows(
times: List[Time], hrs: Union[List[Interval], List[HR]], interval: Interval
) -> Iterator[Tuple[Time, List[int]]]:
"""Yields (_, indices) tuples for windows of length 5"""
for group, indices in itertools.groupby(
range(len(hrs)), lambda k: find_window(times[k])
):
if group is None:
continue
yield group, list(indices)
def expand_rmssd5(
stamps: List[Time], rrs: List[Interval]
) -> Tuple[List[Time], List[RMSSD]]:
"""Calculate RMSSD over 5 minute intervals.
Takes a list of time stamps and the raw RR intervalls, and returns
a list of RMSSD values.
"""
outtimes = []
out = []
for group, indices in windows(stamps, rrs, Interval(5 * 60)):
window_rrs = [rrs[i] for i in indices]
time_domain_features = get_time_domain_features(window_rrs)
out.append(time_domain_features["rmssd"])
outtimes.append(group)
return outtimes, out
def expand_hr5(stamps: List[Time], hrs: List[HR]) -> Tuple[List[Time], List[HR]]:
"""Calculate RMSSD over 5 minute intervals.
Takes a list of time stamps and the raw RR intervalls, and returns
a list of RMSSD values.
"""
outtimes = []
out: List[HR] = []
for group, indices in windows(stamps, hrs, Interval(5 * 60)):
window_hrs = [hrs[i] for i in indices]
outtimes.append(group)
out.append(HR(numpy.median(window_hrs)))
return outtimes, out
def read_elite(path: str) -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]:
"""Read EliteHRV RR export file"""
times: List[Time] = []
hrs: List[HR] = []
rrs: List[Interval] = []
basename = os.path.basename(path)
stamp = datetime.datetime.strptime(basename, "%Y-%m-%d %H-%M-%S.txt").replace(
tzinfo=TZ
)
rr_intervals_list = [int(rr) for rr in open(path)]
time_domain_features = get_time_domain_features(rr_intervals_list)
print("Elite original:", time_domain_features["rmssd"])
# This remove outliers from signal
rr_intervals_without_outliers = remove_outliers(
rr_intervals=rr_intervals_list, low_rri=300, high_rri=2000
)
# This replace outliers nan values with linear interpolation
interpolated_rr_intervals = interpolate_nan_values(
rr_intervals=rr_intervals_without_outliers, interpolation_method="linear"
)
# This remove ectopic beats from signal
nn_intervals_list = remove_ectopic_beats(
rr_intervals=interpolated_rr_intervals, method="kamath"
)
# This replace ectopic beats nan values with linear interpolation
interpolated_nn_intervals = interpolate_nan_values(rr_intervals=nn_intervals_list)
for rr in interpolated_nn_intervals:
stamp += datetime.timedelta(milliseconds=rr)
hr = (1000 / rr) * 60
times.append(stamp)
hrs.append(hr)
rrs.append(rr)
time_domain_features = get_time_domain_features(rrs)
print("Elite:", time_domain_features["rmssd"])
times_h, hrs = expand_hr5(times, hrs)
times_r, rmssds = expand_rmssd5(times, rrs)
times = times_h
return times, hrs, rmssds, RMSSD(time_domain_features["rmssd"])
def read_oura(path: str) -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]:
"""Read oura ring export"""
with open(path) as fobj:
oura = json.load(fobj)
times: List[Time] = []
hrs: List[HR] = []
rmssds: List[RMSSD] = []
sleep = oura["sleep"][-1]
stamp = datetime.datetime.fromisoformat(sleep["bedtime_start"])
for hr5, hrv5 in zip(sleep["hr_5min"], sleep["rmssd_5min"]):
times.append(stamp)
hrs.append(HR(hr5) if hr5 else HR(NaN))
rmssds.append(RMSSD(hrv5) if hrv5 else RMSSD(NaN))
stamp += datetime.timedelta(seconds=300)
return times, hrs, rmssds, sleep["rmssd"]
def do_plot() -> None:
"""Main entry point"""
times1, hrs1, rmssds1, rmssd1 = read_oura(sys.argv[2])
global oura_start
oura_start = times1[0]
times, hrs, rmssds, rmssd = read_elite(sys.argv[1])
plt.gcf().clear()
# Plot the graphs
if "--cor" in sys.argv:
times_all = sorted(set(times + times1))
elite: Dict[Time, float] = {}
oura: Dict[Time, float] = {}
if "--hrs" in sys.argv:
elite, oura = dict(zip(times, hrs)), dict(zip(times1, hrs1))
else:
elite, oura = dict(zip(times, rmssds)), dict(zip(times1, rmssds1))
xs = []
ys = []
for time in times_all:
x = elite.get(time, NaN)
y = oura.get(time, NaN)
if math.isnan(x) or math.isnan(y):
continue
xs.append(x)
ys.append(y)
coef = numpy.polyfit(xs, ys, 1)
poly1d_fn = numpy.poly1d(coef)
cor = numpy.corrcoef(xs, ys)[0, 1] ** 2
plt.xlabel("Wahoo Tickr")
plt.ylabel("Oura")
plt.plot(xs, ys, "yo", alpha=0.5)
plt.plot(xs, numpy.poly1d((1, 0))(xs), "--", alpha=0.3)
plt.plot(
xs,
poly1d_fn(xs),
"-k",
label=f"Correlation y = {coef[1]:.3} + {coef[0]:.3}x, R²={cor:.3}",
)
elif "--hrs" in sys.argv:
plt.plot(times, hrs, "-", label="tickr hr", alpha=0.3)
plt.plot(times1, hrs1, "-", label="oura hr")
# Plot the averages
hrs1_filtered = [hr for hr in hrs1 if not math.isnan(hr)]
hr = sum(hrs) / len(hrs)
hr1 = sum(hrs1_filtered) / len(hrs1_filtered)
print("HR", hr, "HR1", hr1)
plt.plot(times, [hr for _ in times], "--", label="tickr hr average", alpha=0.5)
plt.plot(
times1, [hr1 for _ in times1], "--", label="oura hr average", alpha=0.5
)
# Plot the graphs
else:
plt.plot(times, rmssds, ".-", label="tickr hrv", alpha=0.3)
plt.plot(times1, rmssds1, ".-", label="oura hrv")
# Plot the averages
plt.plot(
times, [rmssd for _ in times], "--", label="tickr hrv average", alpha=0.5
)
plt.plot(
times1, [rmssd1 for _ in times1], "--", label="oura hrv average", alpha=0.5
)
plt.grid()
plt.legend()
plt.gcf().autofmt_xdate()
def main(argv: List[str]) -> None:
do_plot()
if "--out" in argv:
plt.savefig(sys.argv[sys.argv.index("--out") + 1], bbox_inches="tight", dpi=150)
else:
plt.show()
if __name__ == "__main__":
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment