Created
May 25, 2021 09:15
-
-
Save julian-klode/f13cd75ccf9498b509b2b2982a831d98 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# Copyright (C) 2020-2021 Julian Andres Klode <[email protected]> | |
# | |
# This Source Code Form is subject to the terms of the Mozilla | |
# Public License, v. 2.0. If a copy of the MPL was not distributed | |
# with this file, You can obtain one at | |
# https://mozilla.org/MPL/2.0/. | |
# | |
"""HR plotting script. | |
Usage: plot-hr.py [--png] | |
Plot HR of oura ring vs elite hrv export. | |
""" | |
from typing import ( | |
Iterator, | |
List, | |
Tuple, | |
NewType, | |
) | |
import datetime | |
import matplotlib.pyplot as plt # type: ignore | |
import matplotlib.animation as animation # type: ignore | |
import sys | |
import itertools | |
from hrvanalysis import ( # type: ignore | |
get_time_domain_features, | |
remove_outliers, | |
interpolate_nan_values, | |
remove_ectopic_beats, | |
) | |
import json | |
# Type declarations | |
Time = datetime.datetime | |
Interval = NewType("Interval", int) | |
HR = NewType("HR", float) | |
RMSSD = NewType("RMSSD", float) | |
NaN = float("NaN") | |
def windows( | |
times: List[Time], hrs: List[Interval], interval: Interval | |
) -> Iterator[Tuple[float, Iterator[int]]]: | |
"""Yields (_, indices) tuples for windows of length 5""" | |
return itertools.groupby( | |
range(len(hrs)), lambda k: times[k].timestamp() // interval | |
) | |
def expand_rmssd5(stamps: List[Time], rrs: List[Interval]) -> List[RMSSD]: | |
"""Calculate RMSSD over 5 minute intervals. | |
Takes a list of time stamps and the raw RR intervalls, and returns | |
a list of RMSSD values. | |
""" | |
out = [] | |
for group, indices in windows(stamps, rrs, Interval(5 * 60)): | |
window_rrs = [rrs[i] for i in indices] | |
time_domain_features = get_time_domain_features(window_rrs) | |
for i in window_rrs: | |
out.append(time_domain_features["rmssd"]) | |
return out | |
def read_elite() -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]: | |
"""Read EliteHRV RR export file""" | |
times: List[Time] = [] | |
hrs: List[HR] = [] | |
rrs: List[Interval] = [] | |
stamp = datetime.datetime( | |
year=2021, | |
month=5, | |
day=24, | |
hour=23, | |
minute=4, | |
second=20, | |
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)), | |
) | |
rr_intervals_list = [int(rr) for rr in open("2021-05-24 23-04-20.txt")] | |
# This remove outliers from signal | |
rr_intervals_without_outliers = remove_outliers( | |
rr_intervals=rr_intervals_list, low_rri=300, high_rri=2000 | |
) | |
# This replace outliers nan values with linear interpolation | |
interpolated_rr_intervals = interpolate_nan_values( | |
rr_intervals=rr_intervals_without_outliers, interpolation_method="linear" | |
) | |
# This remove ectopic beats from signal | |
nn_intervals_list = remove_ectopic_beats( | |
rr_intervals=interpolated_rr_intervals, method="malik" | |
) | |
# This replace ectopic beats nan values with linear interpolation | |
interpolated_nn_intervals = interpolate_nan_values(rr_intervals=nn_intervals_list) | |
for rr in interpolated_nn_intervals: | |
stamp += datetime.timedelta(milliseconds=rr) | |
hr = 1.0 / rr | |
times.append(stamp) | |
hrs.append(hr) | |
rrs.append(rr) | |
time_domain_features = get_time_domain_features(rrs) | |
print("Elite:", time_domain_features["rmssd"]) | |
rmssds = expand_rmssd5(times, rrs) | |
return times, hrs, rmssds, RMSSD(time_domain_features["rmssd"]) | |
def read_oura() -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]: | |
"""Read oura ring export""" | |
with open("/home/jak/Downloads/oura_2021-05-25T07-45-08.json") as fobj: | |
oura = json.load(fobj) | |
times: List[Time] = [] | |
hrs: List[HR] = [] | |
rmssds: List[RMSSD] = [] | |
sleep = oura["sleep"][-1] | |
stamp = datetime.datetime.fromisoformat(sleep["bedtime_start"]) | |
for hr5, hrv5 in zip(sleep["hr_5min"], sleep["rmssd_5min"]): | |
for second in range(5 * 60): | |
times.append(stamp) | |
hrs.append(HR(hr5) if hr5 else HR(NaN)) | |
rmssds.append(RMSSD(hrv5) if hrv5 else RMSSD(NaN)) | |
stamp += datetime.timedelta(seconds=1) | |
return times, hrs, rmssds, sleep["rmssd"] | |
def do_plot(i: int) -> None: | |
"""Main entry point""" | |
times, hrs, rmssds, rmssd = read_elite() | |
times1, hrs1, rmssds1, rmssd1 = read_oura() | |
plt.gcf().clear() | |
# Plot the graphs | |
plt.plot(times1, rmssds1, "-", label="oura hrv") | |
plt.plot(times, rmssds, "-", label="tickr hrv", alpha=0.3) | |
# Plot the averages | |
plt.plot(times, [rmssd for _ in times], "--", label="tickr hrv average", alpha=0.5) | |
plt.plot( | |
times1, [rmssd1 for _ in times1], "--", label="oura hrv average", alpha=0.5 | |
) | |
plt.grid() | |
plt.legend() | |
plt.gcf().autofmt_xdate() | |
def main(argv: List[str]) -> None: | |
fig = plt.figure() | |
if "--png" in argv: | |
do_plot(0) | |
plt.savefig("hr.png", bbox_inches="tight", dpi=150) | |
else: | |
ani = animation.FuncAnimation(fig, do_plot, interval=10000) | |
plt.show() | |
if __name__ == "__main__": | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment