Skip to content

Instantly share code, notes, and snippets.

@HB-Stratos
Created September 16, 2025 23:39
Show Gist options
  • Save HB-Stratos/d43fee3d87bd38fb1f7ae8dd7ef5ed1f to your computer and use it in GitHub Desktop.
Save HB-Stratos/d43fee3d87bd38fb1f7ae8dd7ef5ed1f to your computer and use it in GitHub Desktop.
A python script that loads an ardupilot log (or a mavlog, use read_file = MavTLog), converts it to pandas dataframes and then saves these to files with parquet. This means for future runs of the program one doesn't have to run the ~2 minute log read, but can load the cache in ~3s.
import logging
from pathlib import Path
import numpy as np
import pandas as pd
from pymavlog import MavLog, MavTLog
from pymavlog.core import MavLinkMessageSeries
logger = logging.getLogger(__name__)
def series_to_dataframe(series: MavLinkMessageSeries, time_field: str = "timestamp") -> pd.DataFrame:
"""Convert a MavLinkMessageSeries into a pandas DataFrame.
The 'time_field' array is interpreted as seconds or microseconds
based on its scale.
If a microseconds field is available it is used from the second datapoint onwards to get more accurate timing
than the epoch stamp can provide. However, if the internal timer has drift this can introduce inaccuracies.
This function was also partially written by ChatGPT and may have some as of yet untested issues.
:param MavLinkMessageSeries series: _description_
:param str time_field: _description_, defaults to "timestamp"
:raises KeyError: _description_
:return pd.DataFrame: _description_
"""
data = {}
for field, arr in series.raw_fields.items():
arr = np.array(arr)
if arr.ndim == 1:
data[field] = arr
elif arr.ndim == 2:
# Create one new field per component
for idx in range(arr.shape[1]):
data[f"{field}_{idx}"] = arr[:, idx]
else:
# Skip higher-order arrays
logger.warning("How on earth did you get more than two dimensions?")
continue
df = pd.DataFrame(data)
if df.empty:
return df
if time_field not in df.columns:
raise KeyError(f"No '{time_field}' in series.raw_fields")
POSSIBLE_US_KEYS = ["TimeUS", "time_usec"]
# get matching keys from list and all keys, force single output
usec_field = [key for key in POSSIBLE_US_KEYS if key in df]
if usec_field and time_field in df:
usec_field = usec_field[0]
offset_usec = df[usec_field].map(lambda x: x - df[usec_field][0])
offset_usec = offset_usec.map(lambda x: pd.Timedelta(x, unit="us"))
initial_time = pd.to_datetime(df[time_field][0], unit="s")
time = offset_usec.map(lambda x: initial_time + x)
# take initial epoch and offset by more precise usec
df["time"] = time
df = df.set_index("time").sort_index()
return df
unit = "s" if time_field == "timestamp" else "us"
df["time"] = pd.to_datetime(df[time_field], unit=unit)
df = df.set_index("time").sort_index()
return df
def log_to_dataframes(mavlog: MavLog) -> dict[str, pd.DataFrame]:
output: dict[str, pd.DataFrame] = {}
for msg_series_name in mavlog.types:
output[msg_series_name] = series_to_dataframe(mavlog[msg_series_name])
return output
def write_cache(dictionary: dict[str, pd.DataFrame], target_path: Path) -> None:
for key, dataframe in dictionary.items():
dataframe.to_parquet((target_path / key).with_suffix(".gzip"), compression="gzip")
def read_cache(target_path: Path) -> dict[str, pd.DataFrame]:
output: dict[str, pd.DataFrame] = {}
if not target_path.exists():
return output
# Read all .gzip files in the cache directory
for parquet_file in target_path.glob("*.gzip"):
key = parquet_file.stem # Get filename without .gzip extension
output[key] = pd.read_parquet(parquet_file)
return output
read_file = MavLog("data/bump_flight/20240314_Earhard_02.BIN")
read_file.parse()
read_file_df = log_to_dataframes(read_file)
read_file_path = Path.cwd() / "cache" / "FLIGHTNAME"
read_file_path.mkdir(parents=True, exist_ok=True)
write_cache(read_file_df, read_file_path)
cache_loaded = read_cache(read_file_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment