Last active
May 27, 2021 07:45
-
-
Save julian-klode/39417bd1dc736dcfa05964497a608d6e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# Copyright (C) 2020-2021 Julian Andres Klode <[email protected]> | |
# | |
# This Source Code Form is subject to the terms of the Mozilla | |
# Public License, v. 2.0. If a copy of the MPL was not distributed | |
# with this file, You can obtain one at | |
# https://mozilla.org/MPL/2.0/. | |
# | |
"""HR plotting script. | |
Usage: plot-hr.py [--png] | |
Plot HR of oura ring vs elite hrv export. | |
""" | |
from typing import ( | |
Dict, | |
Iterator, | |
List, | |
Tuple, | |
NewType, | |
Union, | |
Optional, | |
) | |
import datetime | |
import matplotlib.pyplot as plt # type: ignore | |
import os | |
import sys | |
import math | |
import itertools | |
import numpy # type: ignore | |
from hrvanalysis import ( # type: ignore | |
get_time_domain_features, | |
remove_outliers, | |
interpolate_nan_values, | |
remove_ectopic_beats, | |
) | |
import json | |
# Type declarations | |
Time = datetime.datetime | |
Interval = NewType("Interval", int) | |
HR = NewType("HR", float) | |
RMSSD = NewType("RMSSD", float) | |
NaN = float("NaN") | |
# Time zone | |
TZ = datetime.datetime.now().astimezone().tzinfo | |
oura_start = datetime.datetime(2021, 1, 1, tzinfo=TZ) | |
def find_window(time: Time) -> Optional[Time]: | |
"""Todo: Speed me up""" | |
if time < oura_start: | |
return None | |
distance = time.timestamp() - oura_start.timestamp() | |
group = distance // 300 * 300 | |
window = datetime.datetime.fromtimestamp( | |
group + oura_start.timestamp(), tz=time.tzinfo | |
) | |
return window | |
def windows( | |
times: List[Time], hrs: Union[List[Interval], List[HR]], interval: Interval | |
) -> Iterator[Tuple[Time, List[int]]]: | |
"""Yields (_, indices) tuples for windows of length 5""" | |
for group, indices in itertools.groupby( | |
range(len(hrs)), lambda k: find_window(times[k]) | |
): | |
if group is None: | |
continue | |
yield group, list(indices) | |
def expand_rmssd5( | |
stamps: List[Time], rrs: List[Interval] | |
) -> Tuple[List[Time], List[RMSSD]]: | |
"""Calculate RMSSD over 5 minute intervals. | |
Takes a list of time stamps and the raw RR intervalls, and returns | |
a list of RMSSD values. | |
""" | |
outtimes = [] | |
out = [] | |
for group, indices in windows(stamps, rrs, Interval(5 * 60)): | |
window_rrs = [rrs[i] for i in indices] | |
time_domain_features = get_time_domain_features(window_rrs) | |
out.append(time_domain_features["rmssd"]) | |
outtimes.append(group) | |
return outtimes, out | |
def expand_hr5(stamps: List[Time], hrs: List[HR]) -> Tuple[List[Time], List[HR]]: | |
"""Calculate RMSSD over 5 minute intervals. | |
Takes a list of time stamps and the raw RR intervalls, and returns | |
a list of RMSSD values. | |
""" | |
outtimes = [] | |
out: List[HR] = [] | |
for group, indices in windows(stamps, hrs, Interval(5 * 60)): | |
window_hrs = [hrs[i] for i in indices] | |
outtimes.append(group) | |
out.append(HR(numpy.median(window_hrs))) | |
return outtimes, out | |
def read_elite(path: str) -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]: | |
"""Read EliteHRV RR export file""" | |
times: List[Time] = [] | |
hrs: List[HR] = [] | |
rrs: List[Interval] = [] | |
basename = os.path.basename(path) | |
stamp = datetime.datetime.strptime(basename, "%Y-%m-%d %H-%M-%S.txt").replace( | |
tzinfo=TZ | |
) | |
rr_intervals_list = [int(rr) for rr in open(path)] | |
time_domain_features = get_time_domain_features(rr_intervals_list) | |
print("Elite original:", time_domain_features["rmssd"]) | |
# This remove outliers from signal | |
rr_intervals_without_outliers = remove_outliers( | |
rr_intervals=rr_intervals_list, low_rri=300, high_rri=2000 | |
) | |
# This replace outliers nan values with linear interpolation | |
interpolated_rr_intervals = interpolate_nan_values( | |
rr_intervals=rr_intervals_without_outliers, interpolation_method="linear" | |
) | |
# This remove ectopic beats from signal | |
nn_intervals_list = remove_ectopic_beats( | |
rr_intervals=interpolated_rr_intervals, method="kamath" | |
) | |
# This replace ectopic beats nan values with linear interpolation | |
interpolated_nn_intervals = interpolate_nan_values(rr_intervals=nn_intervals_list) | |
for rr in interpolated_nn_intervals: | |
stamp += datetime.timedelta(milliseconds=rr) | |
hr = (1000 / rr) * 60 | |
times.append(stamp) | |
hrs.append(hr) | |
rrs.append(rr) | |
time_domain_features = get_time_domain_features(rrs) | |
print("Elite:", time_domain_features["rmssd"]) | |
times_h, hrs = expand_hr5(times, hrs) | |
times_r, rmssds = expand_rmssd5(times, rrs) | |
times = times_h | |
return times, hrs, rmssds, RMSSD(time_domain_features["rmssd"]) | |
def read_oura(path: str) -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]: | |
"""Read oura ring export""" | |
with open(path) as fobj: | |
oura = json.load(fobj) | |
times: List[Time] = [] | |
hrs: List[HR] = [] | |
rmssds: List[RMSSD] = [] | |
sleep = oura["sleep"][-1] | |
stamp = datetime.datetime.fromisoformat(sleep["bedtime_start"]) | |
for hr5, hrv5 in zip(sleep["hr_5min"], sleep["rmssd_5min"]): | |
times.append(stamp) | |
hrs.append(HR(hr5) if hr5 else HR(NaN)) | |
rmssds.append(RMSSD(hrv5) if hrv5 else RMSSD(NaN)) | |
stamp += datetime.timedelta(seconds=300) | |
return times, hrs, rmssds, sleep["rmssd"] | |
def do_plot() -> None: | |
"""Main entry point""" | |
times1, hrs1, rmssds1, rmssd1 = read_oura(sys.argv[2]) | |
global oura_start | |
oura_start = times1[0] | |
times, hrs, rmssds, rmssd = read_elite(sys.argv[1]) | |
plt.gcf().clear() | |
# Plot the graphs | |
if "--cor" in sys.argv: | |
times_all = sorted(set(times + times1)) | |
elite: Dict[Time, float] = {} | |
oura: Dict[Time, float] = {} | |
if "--hrs" in sys.argv: | |
elite, oura = dict(zip(times, hrs)), dict(zip(times1, hrs1)) | |
else: | |
elite, oura = dict(zip(times, rmssds)), dict(zip(times1, rmssds1)) | |
xs = [] | |
ys = [] | |
for time in times_all: | |
x = elite.get(time, NaN) | |
y = oura.get(time, NaN) | |
if math.isnan(x) or math.isnan(y): | |
continue | |
xs.append(x) | |
ys.append(y) | |
coef = numpy.polyfit(xs, ys, 1) | |
poly1d_fn = numpy.poly1d(coef) | |
cor = numpy.corrcoef(xs, ys)[0, 1] ** 2 | |
plt.xlabel("Wahoo Tickr") | |
plt.ylabel("Oura") | |
plt.plot(xs, ys, "yo", alpha=0.5) | |
plt.plot(xs, numpy.poly1d((1, 0))(xs), "--", alpha=0.3) | |
plt.plot( | |
xs, | |
poly1d_fn(xs), | |
"-k", | |
label=f"Correlation y = {coef[1]:.3} + {coef[0]:.3}x, R²={cor:.3}", | |
) | |
elif "--hrs" in sys.argv: | |
plt.plot(times, hrs, "-", label="tickr hr", alpha=0.3) | |
plt.plot(times1, hrs1, "-", label="oura hr") | |
# Plot the averages | |
hrs1_filtered = [hr for hr in hrs1 if not math.isnan(hr)] | |
hr = sum(hrs) / len(hrs) | |
hr1 = sum(hrs1_filtered) / len(hrs1_filtered) | |
print("HR", hr, "HR1", hr1) | |
plt.plot(times, [hr for _ in times], "--", label="tickr hr average", alpha=0.5) | |
plt.plot( | |
times1, [hr1 for _ in times1], "--", label="oura hr average", alpha=0.5 | |
) | |
# Plot the graphs | |
else: | |
plt.plot(times, rmssds, ".-", label="tickr hrv", alpha=0.3) | |
plt.plot(times1, rmssds1, ".-", label="oura hrv") | |
# Plot the averages | |
plt.plot( | |
times, [rmssd for _ in times], "--", label="tickr hrv average", alpha=0.5 | |
) | |
plt.plot( | |
times1, [rmssd1 for _ in times1], "--", label="oura hrv average", alpha=0.5 | |
) | |
plt.grid() | |
plt.legend() | |
plt.gcf().autofmt_xdate() | |
def main(argv: List[str]) -> None: | |
do_plot() | |
if "--out" in argv: | |
plt.savefig(sys.argv[sys.argv.index("--out") + 1], bbox_inches="tight", dpi=150) | |
else: | |
plt.show() | |
if __name__ == "__main__": | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment