Created
September 16, 2025 23:39
-
-
Save HB-Stratos/d43fee3d87bd38fb1f7ae8dd7ef5ed1f to your computer and use it in GitHub Desktop.
A python script that loads an ardupilot log (or a mavlog, use read_file = MavTLog), converts it to pandas dataframes and then saves these to files with parquet. This means for future runs of the program one doesn't have to run the ~2 minute log read, but can load the cache in ~3s.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from pathlib import Path | |
import numpy as np | |
import pandas as pd | |
from pymavlog import MavLog, MavTLog | |
from pymavlog.core import MavLinkMessageSeries | |
logger = logging.getLogger(__name__) | |
def series_to_dataframe(series: MavLinkMessageSeries, time_field: str = "timestamp") -> pd.DataFrame: | |
"""Convert a MavLinkMessageSeries into a pandas DataFrame. | |
The 'time_field' array is interpreted as seconds or microseconds | |
based on its scale. | |
If a microseconds field is available it is used from the second datapoint onwards to get more accurate timing | |
than the epoch stamp can provide. However, if the internal timer has drift this can introduce inaccuracies. | |
This function was also partially written by ChatGPT and may have some as of yet untested issues. | |
:param MavLinkMessageSeries series: _description_ | |
:param str time_field: _description_, defaults to "timestamp" | |
:raises KeyError: _description_ | |
:return pd.DataFrame: _description_ | |
""" | |
data = {} | |
for field, arr in series.raw_fields.items(): | |
arr = np.array(arr) | |
if arr.ndim == 1: | |
data[field] = arr | |
elif arr.ndim == 2: | |
# Create one new field per component | |
for idx in range(arr.shape[1]): | |
data[f"{field}_{idx}"] = arr[:, idx] | |
else: | |
# Skip higher-order arrays | |
logger.warning("How on earth did you get more than two dimensions?") | |
continue | |
df = pd.DataFrame(data) | |
if df.empty: | |
return df | |
if time_field not in df.columns: | |
raise KeyError(f"No '{time_field}' in series.raw_fields") | |
POSSIBLE_US_KEYS = ["TimeUS", "time_usec"] | |
# get matching keys from list and all keys, force single output | |
usec_field = [key for key in POSSIBLE_US_KEYS if key in df] | |
if usec_field and time_field in df: | |
usec_field = usec_field[0] | |
offset_usec = df[usec_field].map(lambda x: x - df[usec_field][0]) | |
offset_usec = offset_usec.map(lambda x: pd.Timedelta(x, unit="us")) | |
initial_time = pd.to_datetime(df[time_field][0], unit="s") | |
time = offset_usec.map(lambda x: initial_time + x) | |
# take initial epoch and offset by more precise usec | |
df["time"] = time | |
df = df.set_index("time").sort_index() | |
return df | |
unit = "s" if time_field == "timestamp" else "us" | |
df["time"] = pd.to_datetime(df[time_field], unit=unit) | |
df = df.set_index("time").sort_index() | |
return df | |
def log_to_dataframes(mavlog: MavLog) -> dict[str, pd.DataFrame]: | |
output: dict[str, pd.DataFrame] = {} | |
for msg_series_name in mavlog.types: | |
output[msg_series_name] = series_to_dataframe(mavlog[msg_series_name]) | |
return output | |
def write_cache(dictionary: dict[str, pd.DataFrame], target_path: Path) -> None: | |
for key, dataframe in dictionary.items(): | |
dataframe.to_parquet((target_path / key).with_suffix(".gzip"), compression="gzip") | |
def read_cache(target_path: Path) -> dict[str, pd.DataFrame]: | |
output: dict[str, pd.DataFrame] = {} | |
if not target_path.exists(): | |
return output | |
# Read all .gzip files in the cache directory | |
for parquet_file in target_path.glob("*.gzip"): | |
key = parquet_file.stem # Get filename without .gzip extension | |
output[key] = pd.read_parquet(parquet_file) | |
return output | |
read_file = MavLog("data/bump_flight/20240314_Earhard_02.BIN") | |
read_file.parse() | |
read_file_df = log_to_dataframes(read_file) | |
read_file_path = Path.cwd() / "cache" / "FLIGHTNAME" | |
read_file_path.mkdir(parents=True, exist_ok=True) | |
write_cache(read_file_df, read_file_path) | |
cache_loaded = read_cache(read_file_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment