Skip to content

Instantly share code, notes, and snippets.

@philerooski
Created September 5, 2024 18:15
Show Gist options
  • Save philerooski/1678efbf1b1b3211105e62678d9bf021 to your computer and use it in GitHub Desktop.
Save philerooski/1678efbf1b1b3211105e62678d9bf021 to your computer and use it in GitHub Desktop.
"""
Run this script from within the unzipped directory `JMV_fitbit_dta`
Download zipped data here: https://www.synapse.org/Synapse:syn62667431
"""
import pandas as pd
import json
import os
def main():
"""
Orchestrates the business logic of the script. It identifies all files, parses metadata, loads data,
and writes the result to Parquet files based on data type.
"""
data_types = [
"estimated_oxygen_variation",
"active_zone_minutes",
"activity_goals",
"daily_readiness_user_properties",
"altitude",
"calories",
"demographic_vo2_max",
"distance",
"exercise",
"heart_rate",
"lightly_active_minutes",
"moderately_active_minutes",
"resting_heart_rate",
"sedentary_minutes",
"steps",
"swim_lengths_data",
"time_in_heart_rate_zones",
"very_active_minutes",
"computed_temperature",
"daily_heart_rate_variability_summary",
"daily_respiratory_rate_summary",
"device_temperature",
"heart_rate_variability_details",
"heart_rate_variability_histogram",
"respiratory_rate_summary",
"wrist_temperature",
"sleep",
"sleep_score",
]
for data_type in data_types:
files_info = get_files_info(data_type)
collate_and_write_to_parquet(files_info)
def get_files_info(data_type):
"""
Accepts a data_type as a parameter and returns a list of dictionaries, each with the keys:
- path: the path to the file
- data_type: the type of data, derived from the file name
- export_end_date: the date from the file name, or None if not applicable.
Uses the parse_file_metadata function to determine the data type and export_end_date from the file name.
Args:
data_type (str): The data type to search for.
Returns:
list: A list of dictionaries containing file metadata.
"""
files_info = []
for root, dirs, files in os.walk("."):
for file in files:
detected_data_type, export_end_date = parse_file_metadata(file)
# Check if the detected data type matches the given data type
if detected_data_type == data_type:
files_info.append(
{
"path": os.path.join(root, file),
"data_type": detected_data_type,
"export_end_date": export_end_date,
}
)
return files_info
def parse_file_metadata(file_name):
"""
Parses out the data_type and date of the file. The data type of a given file will be its prefix,
as delimited by a hyphen. Any file names whose prefix/data type contains spaces are modified
by removing trailing spaces and replacing other spaces with underscores. All data types are lowercased.
Args:
file_name (str): The name of the file.
Returns:
tuple: A tuple containing the data_type (str) and export_end_date (str or None).
"""
base_name = os.path.splitext(file_name)[0]
parts = base_name.split("-")
data_type = parts[0].strip().lower().replace(" ", "_")
export_end_date = "-".join(parts[1:]).strip() if len(parts) > 1 else None
return data_type, export_end_date
def collate_and_write_to_parquet(files_info):
"""
Collates file data and writes as Parquet.
Accepts a list of file metadata dictionaries, loads each file as a
pandas dataframe, adds file metadata (data_type and export_end_date) as columns,
and writes the resulting dataframe to a Parquet file
named "{data_type}.parquet". JSON files are normalized before processing.
Args:
files_info (list): A list of dictionaries containing file metadata.
"""
if not files_info:
return
data_type = files_info[0]["data_type"]
combined_df = pd.DataFrame()
for file_info in files_info:
file_path = file_info["path"]
data_type = file_info["data_type"]
export_end_date = file_info["export_end_date"]
if file_path.endswith(".csv"):
df = pd.read_csv(file_path)
elif file_path.endswith(".json"):
with open(file_path, "r") as json_file:
json_data = json.load(json_file)
df = pd.json_normalize(json_data, sep="_")
else:
continue
df["data_type"] = data_type
df["export_end_date"] = export_end_date
combined_df = pd.concat([combined_df, df], ignore_index=True)
if not combined_df.empty:
combined_df.to_parquet(f"parquet/{data_type}.parquet")
if __name__ == "__main__":
main()
"""
Run these tests from within the unzipped directory `JMV_fitbit_dta` like this:
```
python -m unittest test_convert_to_parquet.py
```
Be sure that `convert_to_parquet.py` is in the same directory.
Download zipped data here: https://www.synapse.org/Synapse:syn62667431
"""
import unittest
from convert_to_parquet import (
get_files_info,
parse_file_metadata
) # Adjust the import as needed if your script is located differently
# Mapping of data types to their expected counts based on the table provided
expected_counts = {
"estimated_oxygen_variation": 1018,
"active_zone_minutes": 37,
"activity_goals": 1,
"daily_readiness_user_properties": 1,
"altitude": 36,
"calories": 38,
"demographic_vo2_max": 4,
"distance": 37,
"exercise": 1,
"heart_rate": 1043,
"lightly_active_minutes": 38,
"moderately_active_minutes": 38,
"resting_heart_rate": 4,
"sedentary_minutes": 38,
"steps": 37,
"swim_lengths_data": 37,
"time_in_heart_rate_zones": 1043,
"very_active_minutes": 38,
"computed_temperature": 36,
"daily_heart_rate_variability_summary": 976,
"daily_respiratory_rate_summary": 975,
"device_temperature": 1028,
"heart_rate_variability_details": 1019,
"heart_rate_variability_histogram": 37,
"respiratory_rate_summary": 37,
"wrist_temperature": 956,
"sleep": 37,
"sleep_score": 1,
}
class TestGetFilesInfo(unittest.TestCase):
def test_get_files_info_counts(self):
"""
Tests that get_files_info returns a list of dictionaries with lengths matching
the expected counts from the provided table.
"""
for data_type, expected_count in expected_counts.items():
with self.subTest(data_type=data_type):
files_info = get_files_info(data_type)
self.assertEqual(
len(files_info),
expected_count,
f"Expected {expected_count} files for data type '{data_type}', got {len(files_info)}",
)
class TestParseFileMetadata(unittest.TestCase):
def test_parse_file_metadata(self):
"""
Tests the parse_file_metadata function using sample file names from the provided table.
Verifies that the data type and export end date are correctly extracted and formatted.
"""
test_cases = [
(
"estimated_oxygen_variation-2024-08-28.csv",
("estimated_oxygen_variation", "2024-08-28"),
),
(
"Active Zone Minutes - 2024-08-01.csv",
("active_zone_minutes", "2024-08-01"),
),
("Activity Goals.csv", ("activity_goals", None)),
(
"Daily Readiness User Properties - 2021-07-24.csv",
("daily_readiness_user_properties", "2021-07-24"),
),
("altitude-2024-08-06.json", ("altitude", "2024-08-06")),
("calories-2024-08-06.json", ("calories", "2024-08-06")),
(
"demographic_vo2_max-2024-07-22.json",
("demographic_vo2_max", "2024-07-22"),
),
("distance-2024-08-06.json", ("distance", "2024-08-06")),
("exercise-0.json", ("exercise", "0")),
("heart_rate-2024-08-28.json", ("heart_rate", "2024-08-28")),
(
"lightly_active_minutes-2024-08-06.json",
("lightly_active_minutes", "2024-08-06"),
),
(
"moderately_active_minutes-2024-08-06.json",
("moderately_active_minutes", "2024-08-06"),
),
(
"resting_heart_rate-2024-07-22.json",
("resting_heart_rate", "2024-07-22"),
),
("sedentary_minutes-2024-08-06.json", ("sedentary_minutes", "2024-08-06")),
("steps-2024-08-06.json", ("steps", "2024-08-06")),
("swim_lengths_data-2024-08-06.json", ("swim_lengths_data", "2024-08-06")),
(
"time_in_heart_rate_zones-2024-08-28.json",
("time_in_heart_rate_zones", "2024-08-28"),
),
(
"very_active_minutes-2024-08-06.json",
("very_active_minutes", "2024-08-06"),
),
(
"Computed Temperature - 2024-08-01.csv",
("computed_temperature", "2024-08-01"),
),
(
"Daily Heart Rate Variability Summary - 2024-08-27.csv",
("daily_heart_rate_variability_summary", "2024-08-27"),
),
(
"Daily Respiratory Rate Summary - 2024-08-27.csv",
("daily_respiratory_rate_summary", "2024-08-27"),
),
(
"Device Temperature - 2024-08-28.csv",
("device_temperature", "2024-08-28"),
),
(
"Heart Rate Variability Details - 2024-08-27.csv",
("heart_rate_variability_details", "2024-08-27"),
),
(
"Heart Rate Variability Histogram - 2024-08-01.csv",
("heart_rate_variability_histogram", "2024-08-01"),
),
(
"Respiratory Rate Summary - 2024-08-01.csv",
("respiratory_rate_summary", "2024-08-01"),
),
("Wrist Temperature - 2024-08-28.csv", ("wrist_temperature", "2024-08-28")),
("sleep-2024-08-06.json", ("sleep", "2024-08-06")),
("sleep_score.csv", ("sleep_score", None)),
]
for file_name, expected in test_cases:
with self.subTest(file_name=file_name):
result = parse_file_metadata(file_name)
self.assertEqual(result, expected, f"Failed for file: {file_name}")
if __name__ == "__main__":
unittest.main()
"""
Run this script from within the unzipped directory `JMV_fitbit_dta`.
Parquet files written by `convert_to_parquet.py` ought to be located
in a `parquet` directory. If data is not found for a data type, a
message will be printed.
A message does not necessarily mean there is
something wrong:
Some data types do not contain any actual data:
* Device Temperature
* Heart Rate Notifications Profile
* Heart Rate Notifications Alerts
* Daily Readiness User Properties
For data types `sleep_scores` and `activity_goals`, there is a bug in this
script which I was too lazy to fix. The script will report that there is no
data in the Parquet, but if you hand check these Parquet files you will find
the expected amount of records (1007 and 12, respectively).
Download zipped data here: https://www.synapse.org/Synapse:syn62667431
"""
import json
import pandas as pd
import os
from collections import defaultdict
from convert_to_parquet import parse_file_metadata # Adjust import if located differently
def verify_csv_to_parquet(directory):
"""
Verifies that the number of records grouped by `export_end_date` in the CSV files matches
the counts in the Parquet file for each detected data type.
Args:
directory (str): The directory containing the CSV files for the data types.
"""
# Dictionary to keep track of counts for each data type
csv_counts = defaultdict(lambda: defaultdict(int))
# Iterate over all files in the specified directory
for root, _, files in os.walk(directory):
for file in files:
if file.endswith('.csv'):
# Use parse_file_metadata to extract data_type and export_end_date
data_type, export_end_date = parse_file_metadata(file)
if data_type: # Only process files with a recognized data type
file_path = os.path.join(root, file)
df = pd.read_csv(file_path)
# Count records grouped by export_end_date
csv_counts[data_type][export_end_date] += len(df)
# Verify counts for each data type against corresponding Parquet files
for data_type, date_counts in csv_counts.items():
parquet_file = f"parquet/{data_type}.parquet"
if os.path.exists(parquet_file):
parquet_df = pd.read_parquet(parquet_file)
# Count records grouped by export_end_date in the Parquet file
parquet_counts = parquet_df.groupby('export_end_date').size().to_dict()
# Compare CSV counts with Parquet counts
for date, count in date_counts.items():
parquet_count = parquet_counts.get(date, 0)
if count != parquet_count:
print(f"Mismatch for {data_type} on {date}: CSV count = {count}, Parquet count = {parquet_count}")
else:
print(f"Parquet file for {data_type} not found. Expected file: {parquet_file}")
# Run verification on the current directory or specify the directory containing CSVs
verify_csv_to_parquet('.')
# List of JSON data types based on your table
json_data_types = [
"altitude", "calories", "demographic_vo2_max", "distance", "exercise",
"heart_rate", "lightly_active_minutes", "moderately_active_minutes",
"resting_heart_rate", "sedentary_minutes", "steps", "swim_lengths_data",
"time_in_heart_rate_zones", "very_active_minutes", "sleep"
]
def extract_identifiers_from_json(file_path):
"""
Extracts unique identifiers from JSON records in the given file.
Args:
file_path (str): The path to the JSON file.
Returns:
set: A set of unique identifiers from the JSON records.
"""
identifiers = set()
try:
with open(file_path, 'r') as json_file:
data = json.load(json_file)
# Check if data is a list of dictionaries
if isinstance(data, list):
for record in data:
if isinstance(record, dict):
# Check for the identifier fields and add them to the set
if 'dateTime' in record:
identifiers.add(record['dateTime'])
elif 'timestamp' in record:
identifiers.add(record['timestamp'])
elif 'logId' in record:
identifiers.add(record['logId'])
except json.JSONDecodeError:
print(f"Error reading JSON file: {file_path}")
return identifiers
def extract_identifiers_from_parquet(parquet_file):
"""
Extracts unique identifiers from the Parquet file.
Args:
parquet_file (str): The path to the Parquet file.
Returns:
set: A set of unique identifiers from the Parquet records.
"""
identifiers = set()
try:
df = pd.read_parquet(parquet_file)
# Check which identifier column exists and add values to the set
if 'dateTime' in df.columns:
identifiers.update(df['dateTime'].dropna().unique())
elif 'timestamp' in df.columns:
identifiers.update(df['timestamp'].dropna().unique())
elif 'logId' in df.columns:
identifiers.update(df['logId'].dropna().unique())
except Exception as e:
print(f"Error reading Parquet file: {parquet_file}. Error: {e}")
return identifiers
def verify_json_to_parquet(directory):
"""
Verifies that every JSON record from each data type has been written to Parquet by comparing
the unique identifiers from JSON files to those in the corresponding Parquet file.
Args:
directory (str): The directory containing the JSON files.
"""
# Dictionary to collect identifiers for each data type
json_identifiers = {data_type: set() for data_type in json_data_types}
# Collect JSON identifiers from all files
for root, _, files in os.walk(directory):
for file in files:
if file.endswith('.json'):
data_type, _ = parse_file_metadata(file)
if data_type in json_data_types:
file_path = os.path.join(root, file)
json_identifiers[data_type].update(extract_identifiers_from_json(file_path))
# Compare identifiers between JSON and Parquet
for data_type, identifiers in json_identifiers.items():
parquet_file = f"parquet/{data_type}.parquet"
if os.path.exists(parquet_file):
parquet_identifiers = extract_identifiers_from_parquet(parquet_file)
# Find missing identifiers
missing_identifiers = identifiers - parquet_identifiers
if missing_identifiers:
print(f"Missing records for {data_type}: {len(missing_identifiers)} identifiers not found in Parquet.")
else:
print(f"Parquet file for {data_type} not found. Expected file: {parquet_file}")
# Specify the directory containing the JSON files
json_directory = "." # Change this to the directory where your JSON files are located
# Run the verification
verify_json_to_parquet(json_directory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment