Last active
November 26, 2023 14:47
-
-
Save kongmunist/c03db87981243a723bff5f76b0620c6e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import os | |
import re | |
# We are loading in a data export from Fitbit. | |
allExports = "raw/" | |
curExport = [allExports+x for x in os.listdir(allExports) if os.path.isdir(allExports+x)][0] | |
curExport = curExport + "/" + [x for x in os.listdir(curExport) if os.path.isdir(curExport+"/"+x)][0] | |
print(f"Data export we're converting is \n\t{curExport}") | |
# In the export, there are a bunch of directories. we are interested in ones that have jsons or csvs in them. They will have common prefixes | |
curExportDirs = [curExport+"/"+x for x in os.listdir(curExport) if os.path.isdir(curExport+"/"+x)] | |
# check how many jsons or csvs in each dir | |
print(f"Number of jsons or csvs in each dir:") | |
numFiles = [len([x for x in os.listdir(y) if x.endswith(".json") or x.endswith(".csv")]) for y in curExportDirs] | |
for i in range(len(numFiles)): | |
print(f"\t{curExportDirs[i].split('/')[-1]}: {numFiles[i]}") | |
alljscsv = [[x for x in os.listdir(y) if x.endswith(".json") or x.endswith(".csv")] for y in curExportDirs] | |
# There are many naming formats in the data export | |
# If a file has a space, we will split it on the space and the first part will be the name of the converted data file, e.g. "Daily Heart Rate Variability Summary - 2021-11-18.csv" | |
# if no space, then we split before the first number, e.g. "time_in_heart_rate_zones-2021-10-07.json" | |
def fnameSplitter(fname): | |
try: | |
if " " in fname: | |
out = fname.split(" - ") | |
return out[0].strip(), out[1].split(".")[0].strip() | |
elif "-" in fname: | |
sp = "(\D*)(.*)" | |
tmp = re.search(sp, fname).groups() | |
return tmp[0][:-1].strip(), tmp[1].split(".")[0].strip() | |
else: | |
raise Exception("No space or dash in filename, defaulting to no split") | |
except: | |
return [fname.rsplit(".",1)[0]] | |
# For each data export type in curExportDirs (Nutrition, Sleep, Other, etc.), we load all the JSONs and CSVs into a dict shaped like this | |
# d = { | |
# [ | |
# "Nutrition_food_logs": ['food_logs-500.json', 'food_logs-400.json', ...], | |
# "Nutrition_water_logs": ['water_logs-500.json', 'water_logs-400.json', ...], | |
# ], | |
# [ | |
# "Stress_Stress Score": ["Stress Score.csv"], | |
# ] | |
# ... | |
# } | |
ldict = [] | |
for i,eType in enumerate(curExportDirs): | |
eTypeName = eType.split("/")[-1] | |
print(eTypeName) | |
d = dict() | |
ldict.append(d) | |
for jscsv in alljscsv[i]: | |
fname = fnameSplitter(jscsv) | |
if len(fname) == 1: # Single data file, no numbers | |
d[eTypeName + "_" + fname[0]] = [jscsv] | |
elif len(fname) == 2: # Multiple data files, numbered / dated | |
f1, f2 = fname | |
kee = eTypeName + "_" + f1 | |
if kee not in d: | |
d[kee] = [] | |
d[kee].append(jscsv) | |
else: | |
print("ERROR: ", fname) | |
# sanity check, print key and length, sorted by length | |
tmp = [len(d) for d in ldict] | |
tmp = sorted(zip([x.split("/")[-1] for x in curExportDirs], tmp), key=lambda x: x[1]) | |
print("Unique data types in each subdir") | |
for k in tmp: | |
print(k) | |
# Dict has all data files, categorized by name. Import each one and save it as a csv in cooked/ | |
# nots/ is for files that don't have a timestamp column | |
os.makedirs("cooked/nots", exist_ok=True) | |
tsColNames = ["timestamp", "logDate", "dateTime", 'originalStartTime', "DATE", "startTime", "sleep_start", "recorded_time"] | |
for i,d in enumerate(ldict): | |
for k in d.keys(): | |
print(k) | |
# Load all the files into a list of dataframes | |
acc = [] | |
for fname in d[k]: | |
if fname.endswith(".csv"): | |
tmpdf = pd.read_csv(curExportDirs[i] + "/" + fname) | |
elif fname.endswith(".json"): | |
tmpdf = pd.read_json(curExportDirs[i] + "/" + fname) | |
acc.append(tmpdf) | |
# Concatenate all the dataframes into one | |
df = pd.concat(acc) | |
# If there is a timestamp column, parse it and sort by it | |
if any([x in df.columns for x in tsColNames]): | |
tsCol = [x for x in tsColNames if x in df.columns][0] | |
try: | |
df[tsCol] = pd.to_datetime(df[tsCol]) | |
except: | |
df[tsCol] = pd.to_datetime(df[tsCol], format="mixed") | |
df.sort_values(by=tsCol, inplace=True) | |
# Save the dataframe as a CSV | |
df.to_csv("cooked/" + k + ".csv") | |
print(f"\tSaved {k}.csv") | |
else: | |
df.to_csv("cooked/nots/" + k + ".csv") | |
print(f"\tSaved not/{k}.csv") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment