Last active
January 25, 2025 21:32
-
-
Save reefwing/160b467a8a0e69809cb97f5a2bfefe22 to your computer and use it in GitHub Desktop.
Downloads and re-samples the LGHG2@n10C_to_25degC battery data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2024 David Such | |
# | |
# This software is released under the MIT License. | |
# https://opensource.org/licenses/MIT | |
import os | |
import requests | |
import zipfile | |
import scipy.io | |
import pandas as pd | |
import numpy as np | |
# URL of the file to download | |
url = "https://data.mendeley.com/public-files/datasets/cp3473x7xv/files/ad7ac5c9-2b9e-458a-a91f-6f3da449bdfb/file_downloaded" | |
# Output folder contains the extracted ZIP files | |
output_folder = os.path.expanduser("~/Documents/GitHub/Embedded-AI/data/LGHG2@n10C_to_25degC") | |
os.makedirs(output_folder, exist_ok=True) | |
# Download and extract the data set | |
train_folder = os.path.join(output_folder, "Train") | |
test_folder = os.path.join(output_folder, "Test") | |
preprocessed_folder = os.path.join(output_folder, 'Preprocessed') | |
os.makedirs(preprocessed_folder, exist_ok=True) | |
if not os.path.exists(train_folder) or not os.path.exists(test_folder): | |
print("Downloading LGHG2@n10C_to_25degC.zip (56 MB) ... ") | |
download_folder = os.path.dirname(output_folder) | |
filename = os.path.join(download_folder, "LGHG2@n10C_to_25degC.zip") | |
response = requests.get(url) | |
with open(filename, 'wb') as file: | |
file.write(response.content) | |
with zipfile.ZipFile(filename, 'r') as zip_ref: | |
zip_ref.extractall(output_folder) | |
# Define helper function to read .mat files | |
def read_mat_files(folder): | |
data = [] | |
for filename in os.listdir(folder): | |
if filename.endswith(".mat"): | |
filepath = os.path.join(folder, filename) | |
mat_data = scipy.io.loadmat(filepath) | |
data.append(mat_data) | |
return data | |
# Create a file datastore for both the training data and the test data | |
fds_train = read_mat_files(train_folder) | |
fds_test = read_mat_files(test_folder) | |
# Read all data in the datastores | |
train_data_full = fds_train[0] | |
test_data_full_n10deg = fds_test[0] | |
test_data_full_0deg = fds_test[1] | |
test_data_full_10deg = fds_test[2] | |
test_data_full_25deg = fds_test[3] | |
# Print the shapes of the data arrays to understand their structure | |
print("Shape of train_data_full['X']: ", train_data_full['X'].shape) | |
print("Shape of train_data_full['Y']: ", train_data_full['Y'].shape) | |
print("Shape of test_data_full_n10deg['X']: ", test_data_full_n10deg['X'].shape) | |
print("Shape of test_data_full_n10deg['Y']: ", test_data_full_n10deg['Y'].shape) | |
# Extract X and Y from train_data_full | |
X_train = train_data_full['X'] | |
Y_train = train_data_full['Y'] | |
# Define the index ranges | |
idx0 = slice(0, 184257) | |
idx10 = slice(184257, 337973) | |
idx25 = slice(337973, 510530) | |
idxN10 = slice(510530, 669956) | |
# Extract data segments | |
X_idx0 = X_train[:, idx0] | |
Y_idx0 = Y_train[:, idx0] | |
X_idx10 = X_train[:, idx10] | |
Y_idx10 = Y_train[:, idx10] | |
X_idx25 = X_train[:, idx25] | |
Y_idx25 = Y_train[:, idx25] | |
X_idxN10 = X_train[:, idxN10] | |
Y_idxN10 = Y_train[:, idxN10] | |
# Print shapes to verify extraction | |
print(f'X_idx0 shape: {X_idx0.shape}, Y_idx0 shape: {Y_idx0.shape}') | |
print(f'X_idx10 shape: {X_idx10.shape}, Y_idx10 shape: {Y_idx10.shape}') | |
print(f'X_idx25 shape: {X_idx25.shape}, Y_idx25 shape: {Y_idx25.shape}') | |
print(f'X_idxN10 shape: {X_idxN10.shape}, Y_idxN10 shape: {Y_idxN10.shape}') | |
# Resample and compute new moving averages | |
def resample_and_compute_moving_averages(X, Y, step=100): | |
# Resample the data (take every `step`-th point) | |
X_resampled = X[:, ::step] | |
Y_resampled = Y[:, ::step] | |
# Compute new moving averages | |
n = X_resampled.shape[1] | |
avg_voltage_idx = 3 # The 4th row (index 3) is average voltage | |
avg_current_idx = 4 # The 5th row (index 4) is average current | |
new_avg_voltage = np.empty(n) | |
new_avg_current = np.empty(n) | |
for i in range(n): | |
new_avg_voltage[i] = np.mean(X_resampled[0, max(0, i-5):i+1]) | |
new_avg_current[i] = np.mean(X_resampled[1, max(0, i-5):i+1]) | |
X_resampled[avg_voltage_idx, :n] = new_avg_voltage | |
X_resampled[avg_current_idx, :n] = new_avg_current | |
return X_resampled, Y_resampled | |
# Resample and compute new moving averages for training data | |
X_train_resampled, Y_train_resampled = resample_and_compute_moving_averages(X_train, Y_train) | |
# Create DataFrame and save to CSV | |
train_df = pd.DataFrame(np.vstack((X_train_resampled, Y_train_resampled)).T, | |
columns=['Voltage', 'Current', 'Temperature', 'Average Voltage', 'Average Current', 'SOC']) | |
train_df.to_csv(os.path.join(preprocessed_folder, 'resampled_training_data.csv'), index=False) | |
# Extract and resample test data | |
test_data_files = ['n10degC', '0degC', '10degC', '25degC'] | |
resampled_test_data_shapes = {} | |
for i, test_data_full in enumerate(fds_test): | |
X_test = test_data_full['X'] | |
Y_test = test_data_full['Y'] | |
X_test_resampled, Y_test_resampled = resample_and_compute_moving_averages(X_test, Y_test) | |
test_df = pd.DataFrame(np.vstack((X_test_resampled, Y_test_resampled)).T, | |
columns=['Voltage', 'Current', 'Temperature', 'Average Voltage', 'Average Current', 'SOC']) | |
test_df.to_csv(os.path.join(preprocessed_folder, f'resampled_test_data_{test_data_files[i]}.csv'), index=False) | |
resampled_test_data_shapes[test_data_files[i]] = (X_test_resampled.shape, Y_test_resampled.shape) | |
# Print shapes to verify resampling | |
print(f'Training data shape after resampling: X={X_train_resampled.shape}, Y={Y_train_resampled.shape}') | |
for test_file, shapes in resampled_test_data_shapes.items(): | |
print(f'{test_file} test data shape after resampling: X={shapes[0]}, Y={shapes[1]}') | |
# Combine X and Y into a single DataFrame | |
data_resampled = np.vstack((X_train_resampled, Y_train_resampled)) | |
df_resampled = pd.DataFrame(data_resampled.T, columns=['Voltage', 'Current', 'Temperature', 'Average Voltage', 'Average Current', 'SOC']) | |
# Display the first 8 rows | |
print(df_resampled.head(8).to_string(index=False)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment