Created
September 5, 2024 18:15
-
-
Save philerooski/1678efbf1b1b3211105e62678d9bf021 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Run this script from within the unzipped directory `JMV_fitbit_dta` | |
Download zipped data here: https://www.synapse.org/Synapse:syn62667431 | |
""" | |
import pandas as pd | |
import json | |
import os | |
def main(): | |
""" | |
Orchestrates the business logic of the script. It identifies all files, parses metadata, loads data, | |
and writes the result to Parquet files based on data type. | |
""" | |
data_types = [ | |
"estimated_oxygen_variation", | |
"active_zone_minutes", | |
"activity_goals", | |
"daily_readiness_user_properties", | |
"altitude", | |
"calories", | |
"demographic_vo2_max", | |
"distance", | |
"exercise", | |
"heart_rate", | |
"lightly_active_minutes", | |
"moderately_active_minutes", | |
"resting_heart_rate", | |
"sedentary_minutes", | |
"steps", | |
"swim_lengths_data", | |
"time_in_heart_rate_zones", | |
"very_active_minutes", | |
"computed_temperature", | |
"daily_heart_rate_variability_summary", | |
"daily_respiratory_rate_summary", | |
"device_temperature", | |
"heart_rate_variability_details", | |
"heart_rate_variability_histogram", | |
"respiratory_rate_summary", | |
"wrist_temperature", | |
"sleep", | |
"sleep_score", | |
] | |
for data_type in data_types: | |
files_info = get_files_info(data_type) | |
collate_and_write_to_parquet(files_info) | |
def get_files_info(data_type): | |
""" | |
Accepts a data_type as a parameter and returns a list of dictionaries, each with the keys: | |
- path: the path to the file | |
- data_type: the type of data, derived from the file name | |
- export_end_date: the date from the file name, or None if not applicable. | |
Uses the parse_file_metadata function to determine the data type and export_end_date from the file name. | |
Args: | |
data_type (str): The data type to search for. | |
Returns: | |
list: A list of dictionaries containing file metadata. | |
""" | |
files_info = [] | |
for root, dirs, files in os.walk("."): | |
for file in files: | |
detected_data_type, export_end_date = parse_file_metadata(file) | |
# Check if the detected data type matches the given data type | |
if detected_data_type == data_type: | |
files_info.append( | |
{ | |
"path": os.path.join(root, file), | |
"data_type": detected_data_type, | |
"export_end_date": export_end_date, | |
} | |
) | |
return files_info | |
def parse_file_metadata(file_name): | |
""" | |
Parses out the data_type and date of the file. The data type of a given file will be its prefix, | |
as delimited by a hyphen. Any file names whose prefix/data type contains spaces are modified | |
by removing trailing spaces and replacing other spaces with underscores. All data types are lowercased. | |
Args: | |
file_name (str): The name of the file. | |
Returns: | |
tuple: A tuple containing the data_type (str) and export_end_date (str or None). | |
""" | |
base_name = os.path.splitext(file_name)[0] | |
parts = base_name.split("-") | |
data_type = parts[0].strip().lower().replace(" ", "_") | |
export_end_date = "-".join(parts[1:]).strip() if len(parts) > 1 else None | |
return data_type, export_end_date | |
def collate_and_write_to_parquet(files_info): | |
""" | |
Collates file data and writes as Parquet. | |
Accepts a list of file metadata dictionaries, loads each file as a | |
pandas dataframe, adds file metadata (data_type and export_end_date) as columns, | |
and writes the resulting dataframe to a Parquet file | |
named "{data_type}.parquet". JSON files are normalized before processing. | |
Args: | |
files_info (list): A list of dictionaries containing file metadata. | |
""" | |
if not files_info: | |
return | |
data_type = files_info[0]["data_type"] | |
combined_df = pd.DataFrame() | |
for file_info in files_info: | |
file_path = file_info["path"] | |
data_type = file_info["data_type"] | |
export_end_date = file_info["export_end_date"] | |
if file_path.endswith(".csv"): | |
df = pd.read_csv(file_path) | |
elif file_path.endswith(".json"): | |
with open(file_path, "r") as json_file: | |
json_data = json.load(json_file) | |
df = pd.json_normalize(json_data, sep="_") | |
else: | |
continue | |
df["data_type"] = data_type | |
df["export_end_date"] = export_end_date | |
combined_df = pd.concat([combined_df, df], ignore_index=True) | |
if not combined_df.empty: | |
combined_df.to_parquet(f"parquet/{data_type}.parquet") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Run these tests from within the unzipped directory `JMV_fitbit_dta` like this: | |
``` | |
python -m unittest test_convert_to_parquet.py | |
``` | |
Be sure that `convert_to_parquet.py` is in the same directory. | |
Download zipped data here: https://www.synapse.org/Synapse:syn62667431 | |
""" | |
import unittest | |
from convert_to_parquet import ( | |
get_files_info, | |
parse_file_metadata | |
) # Adjust the import as needed if your script is located differently | |
# Mapping of data types to their expected counts based on the table provided | |
expected_counts = { | |
"estimated_oxygen_variation": 1018, | |
"active_zone_minutes": 37, | |
"activity_goals": 1, | |
"daily_readiness_user_properties": 1, | |
"altitude": 36, | |
"calories": 38, | |
"demographic_vo2_max": 4, | |
"distance": 37, | |
"exercise": 1, | |
"heart_rate": 1043, | |
"lightly_active_minutes": 38, | |
"moderately_active_minutes": 38, | |
"resting_heart_rate": 4, | |
"sedentary_minutes": 38, | |
"steps": 37, | |
"swim_lengths_data": 37, | |
"time_in_heart_rate_zones": 1043, | |
"very_active_minutes": 38, | |
"computed_temperature": 36, | |
"daily_heart_rate_variability_summary": 976, | |
"daily_respiratory_rate_summary": 975, | |
"device_temperature": 1028, | |
"heart_rate_variability_details": 1019, | |
"heart_rate_variability_histogram": 37, | |
"respiratory_rate_summary": 37, | |
"wrist_temperature": 956, | |
"sleep": 37, | |
"sleep_score": 1, | |
} | |
class TestGetFilesInfo(unittest.TestCase): | |
def test_get_files_info_counts(self): | |
""" | |
Tests that get_files_info returns a list of dictionaries with lengths matching | |
the expected counts from the provided table. | |
""" | |
for data_type, expected_count in expected_counts.items(): | |
with self.subTest(data_type=data_type): | |
files_info = get_files_info(data_type) | |
self.assertEqual( | |
len(files_info), | |
expected_count, | |
f"Expected {expected_count} files for data type '{data_type}', got {len(files_info)}", | |
) | |
class TestParseFileMetadata(unittest.TestCase): | |
def test_parse_file_metadata(self): | |
""" | |
Tests the parse_file_metadata function using sample file names from the provided table. | |
Verifies that the data type and export end date are correctly extracted and formatted. | |
""" | |
test_cases = [ | |
( | |
"estimated_oxygen_variation-2024-08-28.csv", | |
("estimated_oxygen_variation", "2024-08-28"), | |
), | |
( | |
"Active Zone Minutes - 2024-08-01.csv", | |
("active_zone_minutes", "2024-08-01"), | |
), | |
("Activity Goals.csv", ("activity_goals", None)), | |
( | |
"Daily Readiness User Properties - 2021-07-24.csv", | |
("daily_readiness_user_properties", "2021-07-24"), | |
), | |
("altitude-2024-08-06.json", ("altitude", "2024-08-06")), | |
("calories-2024-08-06.json", ("calories", "2024-08-06")), | |
( | |
"demographic_vo2_max-2024-07-22.json", | |
("demographic_vo2_max", "2024-07-22"), | |
), | |
("distance-2024-08-06.json", ("distance", "2024-08-06")), | |
("exercise-0.json", ("exercise", "0")), | |
("heart_rate-2024-08-28.json", ("heart_rate", "2024-08-28")), | |
( | |
"lightly_active_minutes-2024-08-06.json", | |
("lightly_active_minutes", "2024-08-06"), | |
), | |
( | |
"moderately_active_minutes-2024-08-06.json", | |
("moderately_active_minutes", "2024-08-06"), | |
), | |
( | |
"resting_heart_rate-2024-07-22.json", | |
("resting_heart_rate", "2024-07-22"), | |
), | |
("sedentary_minutes-2024-08-06.json", ("sedentary_minutes", "2024-08-06")), | |
("steps-2024-08-06.json", ("steps", "2024-08-06")), | |
("swim_lengths_data-2024-08-06.json", ("swim_lengths_data", "2024-08-06")), | |
( | |
"time_in_heart_rate_zones-2024-08-28.json", | |
("time_in_heart_rate_zones", "2024-08-28"), | |
), | |
( | |
"very_active_minutes-2024-08-06.json", | |
("very_active_minutes", "2024-08-06"), | |
), | |
( | |
"Computed Temperature - 2024-08-01.csv", | |
("computed_temperature", "2024-08-01"), | |
), | |
( | |
"Daily Heart Rate Variability Summary - 2024-08-27.csv", | |
("daily_heart_rate_variability_summary", "2024-08-27"), | |
), | |
( | |
"Daily Respiratory Rate Summary - 2024-08-27.csv", | |
("daily_respiratory_rate_summary", "2024-08-27"), | |
), | |
( | |
"Device Temperature - 2024-08-28.csv", | |
("device_temperature", "2024-08-28"), | |
), | |
( | |
"Heart Rate Variability Details - 2024-08-27.csv", | |
("heart_rate_variability_details", "2024-08-27"), | |
), | |
( | |
"Heart Rate Variability Histogram - 2024-08-01.csv", | |
("heart_rate_variability_histogram", "2024-08-01"), | |
), | |
( | |
"Respiratory Rate Summary - 2024-08-01.csv", | |
("respiratory_rate_summary", "2024-08-01"), | |
), | |
("Wrist Temperature - 2024-08-28.csv", ("wrist_temperature", "2024-08-28")), | |
("sleep-2024-08-06.json", ("sleep", "2024-08-06")), | |
("sleep_score.csv", ("sleep_score", None)), | |
] | |
for file_name, expected in test_cases: | |
with self.subTest(file_name=file_name): | |
result = parse_file_metadata(file_name) | |
self.assertEqual(result, expected, f"Failed for file: {file_name}") | |
if __name__ == "__main__": | |
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Run this script from within the unzipped directory `JMV_fitbit_dta`. | |
Parquet files written by `convert_to_parquet.py` ought to be located | |
in a `parquet` directory. If data is not found for a data type, a | |
message will be printed. | |
A message does not necessarily mean there is | |
something wrong: | |
Some data types do not contain any actual data: | |
* Device Temperature | |
* Heart Rate Notifications Profile | |
* Heart Rate Notifications Alerts | |
* Daily Readiness User Properties | |
For data types `sleep_scores` and `activity_goals`, there is a bug in this | |
script which I was too lazy to fix. The script will report that there is no | |
data in the Parquet, but if you hand check these Parquet files you will find | |
the expected amount of records (1007 and 12, respectively). | |
Download zipped data here: https://www.synapse.org/Synapse:syn62667431 | |
""" | |
import json | |
import pandas as pd | |
import os | |
from collections import defaultdict | |
from convert_to_parquet import parse_file_metadata # Adjust import if located differently | |
def verify_csv_to_parquet(directory): | |
""" | |
Verifies that the number of records grouped by `export_end_date` in the CSV files matches | |
the counts in the Parquet file for each detected data type. | |
Args: | |
directory (str): The directory containing the CSV files for the data types. | |
""" | |
# Dictionary to keep track of counts for each data type | |
csv_counts = defaultdict(lambda: defaultdict(int)) | |
# Iterate over all files in the specified directory | |
for root, _, files in os.walk(directory): | |
for file in files: | |
if file.endswith('.csv'): | |
# Use parse_file_metadata to extract data_type and export_end_date | |
data_type, export_end_date = parse_file_metadata(file) | |
if data_type: # Only process files with a recognized data type | |
file_path = os.path.join(root, file) | |
df = pd.read_csv(file_path) | |
# Count records grouped by export_end_date | |
csv_counts[data_type][export_end_date] += len(df) | |
# Verify counts for each data type against corresponding Parquet files | |
for data_type, date_counts in csv_counts.items(): | |
parquet_file = f"parquet/{data_type}.parquet" | |
if os.path.exists(parquet_file): | |
parquet_df = pd.read_parquet(parquet_file) | |
# Count records grouped by export_end_date in the Parquet file | |
parquet_counts = parquet_df.groupby('export_end_date').size().to_dict() | |
# Compare CSV counts with Parquet counts | |
for date, count in date_counts.items(): | |
parquet_count = parquet_counts.get(date, 0) | |
if count != parquet_count: | |
print(f"Mismatch for {data_type} on {date}: CSV count = {count}, Parquet count = {parquet_count}") | |
else: | |
print(f"Parquet file for {data_type} not found. Expected file: {parquet_file}") | |
# Run verification on the current directory or specify the directory containing CSVs | |
verify_csv_to_parquet('.') | |
# List of JSON data types based on your table | |
json_data_types = [ | |
"altitude", "calories", "demographic_vo2_max", "distance", "exercise", | |
"heart_rate", "lightly_active_minutes", "moderately_active_minutes", | |
"resting_heart_rate", "sedentary_minutes", "steps", "swim_lengths_data", | |
"time_in_heart_rate_zones", "very_active_minutes", "sleep" | |
] | |
def extract_identifiers_from_json(file_path): | |
""" | |
Extracts unique identifiers from JSON records in the given file. | |
Args: | |
file_path (str): The path to the JSON file. | |
Returns: | |
set: A set of unique identifiers from the JSON records. | |
""" | |
identifiers = set() | |
try: | |
with open(file_path, 'r') as json_file: | |
data = json.load(json_file) | |
# Check if data is a list of dictionaries | |
if isinstance(data, list): | |
for record in data: | |
if isinstance(record, dict): | |
# Check for the identifier fields and add them to the set | |
if 'dateTime' in record: | |
identifiers.add(record['dateTime']) | |
elif 'timestamp' in record: | |
identifiers.add(record['timestamp']) | |
elif 'logId' in record: | |
identifiers.add(record['logId']) | |
except json.JSONDecodeError: | |
print(f"Error reading JSON file: {file_path}") | |
return identifiers | |
def extract_identifiers_from_parquet(parquet_file): | |
""" | |
Extracts unique identifiers from the Parquet file. | |
Args: | |
parquet_file (str): The path to the Parquet file. | |
Returns: | |
set: A set of unique identifiers from the Parquet records. | |
""" | |
identifiers = set() | |
try: | |
df = pd.read_parquet(parquet_file) | |
# Check which identifier column exists and add values to the set | |
if 'dateTime' in df.columns: | |
identifiers.update(df['dateTime'].dropna().unique()) | |
elif 'timestamp' in df.columns: | |
identifiers.update(df['timestamp'].dropna().unique()) | |
elif 'logId' in df.columns: | |
identifiers.update(df['logId'].dropna().unique()) | |
except Exception as e: | |
print(f"Error reading Parquet file: {parquet_file}. Error: {e}") | |
return identifiers | |
def verify_json_to_parquet(directory): | |
""" | |
Verifies that every JSON record from each data type has been written to Parquet by comparing | |
the unique identifiers from JSON files to those in the corresponding Parquet file. | |
Args: | |
directory (str): The directory containing the JSON files. | |
""" | |
# Dictionary to collect identifiers for each data type | |
json_identifiers = {data_type: set() for data_type in json_data_types} | |
# Collect JSON identifiers from all files | |
for root, _, files in os.walk(directory): | |
for file in files: | |
if file.endswith('.json'): | |
data_type, _ = parse_file_metadata(file) | |
if data_type in json_data_types: | |
file_path = os.path.join(root, file) | |
json_identifiers[data_type].update(extract_identifiers_from_json(file_path)) | |
# Compare identifiers between JSON and Parquet | |
for data_type, identifiers in json_identifiers.items(): | |
parquet_file = f"parquet/{data_type}.parquet" | |
if os.path.exists(parquet_file): | |
parquet_identifiers = extract_identifiers_from_parquet(parquet_file) | |
# Find missing identifiers | |
missing_identifiers = identifiers - parquet_identifiers | |
if missing_identifiers: | |
print(f"Missing records for {data_type}: {len(missing_identifiers)} identifiers not found in Parquet.") | |
else: | |
print(f"Parquet file for {data_type} not found. Expected file: {parquet_file}") | |
# Specify the directory containing the JSON files | |
json_directory = "." # Change this to the directory where your JSON files are located | |
# Run the verification | |
verify_json_to_parquet(json_directory) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment