Created
January 23, 2023 16:36
-
-
Save IamPhytan/b2e411c2d064d01f3a5763017a48d53d to your computer and use it in GitHub Desktop.
MCAP to CSV
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
from collections import OrderedDict | |
from pathlib import Path | |
from typing import Dict, Tuple, Union | |
import pandas as pd | |
from mcap.reader import McapReader, make_reader | |
from mcap.records import Channel, Statistics | |
from mcap.summary import Summary | |
from mcap_ros1.reader import read_ros1_messages | |
from mcap_ros2.reader import read_ros2_messages | |
from tf_transformations import euler_from_quaternion | |
from tqdm import tqdm | |
class MCAPPath(type(Path())): | |
csv_type: str | None = None | |
def parse_arguments(): | |
"""Parse mcapfile name""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-b", "--mcapfile", help="""MCAP file path""", required=True) | |
args = parser.parse_args() | |
mcapfile = args.mcapfile | |
mcap_filepath = MCAPPath(mcapfile) | |
return mcap_filepath | |
def get_ros_profile(reader: McapReader): | |
"""Get ROS Profile from a McapReader | |
Args: | |
filename (Path | str): _description_ | |
Returns: | |
str: ROS profile of MCAP messages (ros1 / ros2) | |
""" | |
header = reader.get_header() | |
return header.profile | |
def slugify(topic: str): | |
"""Convert topic into a filename-compatible string | |
Slugify | |
Args: | |
topic: ROS Topic | |
Returns: | |
str: Slugified ROS Topic | |
""" | |
topic = topic[1:] if topic.startswith("/") else topic | |
return topic.replace("/", "_") | |
def get_topics(csv_type: str): | |
"""Get list of topics according to the csv type | |
Args: | |
csv_type (str): Type of CSV to export | |
Returns: | |
list: List of topics to export for the CSV type | |
""" | |
ToI = { | |
"elec": [ | |
"/mcu/status", | |
"/left_drive/status/battery_voltage", | |
"/left_drive/status/battery_current", | |
"/right_drive/status/battery_voltage", | |
"/right_drive/status/battery_current", | |
], | |
"imu": [ | |
"/imu/data", | |
"/imu_and_wheel_odom", | |
"/MTI_imu/data", | |
], | |
"gps": [ | |
"/gps/fix_front", | |
"/gps/fix_back", | |
"/gps/vel_front", | |
"/gps/vel_back", | |
"/gps/time_reference_front", | |
"/gps/time_reference_back", | |
], | |
"odom": [ | |
"/cmd_vel", | |
"/hri_cmd_vel", | |
"/rc_teleop/cmd_vel", | |
"/left_drive/status/speed", | |
"/left_drive/velocity", | |
"/right_drive/status/speed", | |
"/right_drive/velocity", | |
"/velocity", | |
"/warthog_velocity_controller/odom", | |
"/warthog_velocity_controller/cmd_vel", | |
"/icp_odom", | |
], | |
"temp": [ | |
"/left_drive/status/heatsink_temperature", | |
"/left_drive/status/motor_temperature", | |
"/right_drive/status/heatsink_temperature", | |
"/right_drive/status/motor_temperature", | |
], | |
"doughnut": [ | |
"/calib_switch", | |
], | |
} | |
return ToI[csv_type] | |
def decompose_msg(topic: str, msg): | |
"""Get important elements from a rosbag msg | |
Args: | |
topic (str): Topic of the msg | |
msg (rosbag.msg): Structure that contains information at a certain time in the rosbag | |
Raises: | |
NotImplementedError: Topic is not implemented | |
Returns: | |
dict: Dictionary of important values from the msg | |
""" | |
if topic == "/mcu/status": | |
return { | |
"measured_battery": msg.measured_battery, | |
"measured_48v": msg.measured_48v, | |
"measured_24v": msg.measured_24v, | |
"measured_12v": msg.measured_12v, | |
"current_battery": msg.current_battery, | |
"current_48v": msg.current_48v, | |
"current_24v": msg.current_24v, | |
"current_12v": msg.current_12v, | |
"current_computer": msg.current_computer, | |
} | |
elif topic in ["/imu/data", "/MTI_imu/data"]: | |
timestamp = msg.header.stamp.to_sec() | |
orient = msg.orientation | |
ang_vel = msg.angular_velocity | |
lin_acc = msg.linear_acceleration | |
euler = euler_from_quaternion([orient.x, orient.y, orient.z, orient.w]) | |
roll, pitch, yaw = euler | |
return { | |
"time": timestamp, | |
"orientation": { | |
"x": orient.x, | |
"y": orient.y, | |
"z": orient.z, | |
"w": orient.w, | |
"roll": roll, | |
"pitch": pitch, | |
"yaw": yaw, | |
}, | |
"ang_vel": {"x": ang_vel.x, "y": ang_vel.y, "z": ang_vel.z}, | |
"lin_acc": {"x": lin_acc.x, "y": lin_acc.y, "z": lin_acc.z}, | |
} | |
elif topic in ["/imu_and_wheel_odom", "/icp_odom"]: | |
timestamp = msg.header.stamp.to_sec() | |
pose = msg.pose.pose | |
posit = pose.position | |
orient = pose.orientation | |
euler = euler_from_quaternion([orient.x, orient.y, orient.z, orient.w]) | |
roll, pitch, yaw = euler | |
twist = msg.twist.twist | |
lin_twi = twist.linear | |
ang_twi = twist.angular | |
return { | |
"time": timestamp, | |
"position": {"x": posit.x, "y": posit.y, "z": posit.z}, | |
"orientation": { | |
"x": orient.x, | |
"y": orient.y, | |
"z": orient.z, | |
"w": orient.w, | |
"roll": roll, | |
"pitch": pitch, | |
"yaw": yaw, | |
}, | |
"lin_twi": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z}, | |
"ang_twi": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z}, | |
} | |
elif topic == "/warthog_velocity_controller/odom": | |
timestamp = msg.header.stamp.to_sec() | |
pose = msg.pose.pose | |
posit = pose.position | |
orient = pose.orientation | |
euler = euler_from_quaternion([orient.x, orient.y, orient.z, orient.w]) | |
roll, pitch, yaw = euler | |
twist = msg.twist.twist | |
lin_twi = twist.linear | |
ang_twi = twist.angular | |
return { | |
"time": timestamp, | |
"position": {"x": posit.x, "y": posit.y, "z": posit.z}, | |
"orientation": { | |
"x": orient.x, | |
"y": orient.y, | |
"z": orient.z, | |
"w": orient.w, | |
"roll": roll, | |
"pitch": pitch, | |
"yaw": yaw, | |
}, | |
"lin_twi": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z}, | |
"ang_twi": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z}, | |
} | |
elif "/gps/fix_" in topic: | |
# GPS Fix position | |
timestamp = msg.header.stamp.to_sec() | |
return { | |
"time": timestamp, | |
"latitude": msg.latitude, | |
"longitude": msg.longitude, | |
"altitude": msg.altitude, | |
} | |
elif "/gps/vel_" in topic: | |
# GPS Fix position | |
timestamp = msg.header.stamp.to_sec() | |
lin_twi = msg.twist.linear | |
ang_twi = msg.twist.angular | |
return { | |
"time": timestamp, | |
"lin_twi": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z}, | |
"ang_twi": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z}, | |
} | |
elif "/gps/time_reference_" in topic: | |
# GPS Fix position | |
timestamp = msg.header.stamp.to_sec() | |
time_ref = msg.time_ref.to_sec() | |
return {"time": timestamp, "time_ref": time_ref} | |
elif topic == "/velocity": | |
# Twist Stamped messaged | |
timestamp = msg.header.stamp.to_sec() | |
lin_twi = msg.twist.linear | |
ang_twi = msg.twist.angular | |
return { | |
"time": timestamp, | |
"linear": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z}, | |
"angular": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z}, | |
} | |
elif "cmd_vel" in topic: | |
# command velocity | |
lin_twi = msg.linear | |
ang_twi = msg.angular | |
return { | |
"linear": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z}, | |
"angular": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z}, | |
} | |
elif "_drive/status/speed" in topic or "_drive/velocity" in topic: | |
# Drive velocity data | |
return msg.data | |
elif "_drive/status/battery" in topic: | |
# Drive battery electrical data | |
return msg.data | |
elif "_temperature" in topic: | |
# Drive temperature data | |
return msg.data | |
elif topic == "/calib_switch": | |
# Doughnut calibration switch | |
return msg.data | |
else: | |
raise NotImplementedError(f"Not yet implemented : {topic}") | |
ChannelOrStatistics = Union[Channel, Statistics] | |
def get_topic_metadata(summary: Summary, topic: str) -> Dict[str, ChannelOrStatistics]: | |
"""Get metadata for a specific topic | |
Args: | |
summary: _description_ | |
topic: Topic name | |
""" | |
stats = summary.statistics | |
channs = summary.channels | |
top_ch = tuple(ch for ch in channs.values() if ch.topic == topic) | |
if len(top_ch) == 0: | |
raise ValueError(f"Topic '{topic}' is not available in the mcap file") | |
top_ch = top_ch[0] | |
return {"channel": top_ch, "stats": stats} | |
def get_topic_msgs(topic_: str, reader: McapReader): | |
"""Get rosbag messages from a certain topic | |
Args: | |
topic_: Topic of interest | |
reader: Bag object created with the `rosbag` library | |
Returns: | |
List: List of messages from the topic. Each message is given in the form of a dictionary | |
""" | |
time_dat = OrderedDict.fromkeys(["time", topic_], 0) | |
data_list = [] | |
# Get the right reader from the header profile | |
profile = reader.get_header().profile | |
read_funcs = {"ros1": read_ros1_messages, "ros2": read_ros2_messages} | |
func = read_funcs.get(profile.lower(), None) | |
if func is None: | |
raise NotImplementedError(f"Profile {profile} not implemented") | |
topic_meta = get_topic_metadata(reader.get_summary(), topic_) | |
top_ch = topic_meta["channel"] | |
top_stats = topic_meta["stats"] | |
msg_count = top_stats.channel_message_counts[top_ch.id] | |
with tqdm(total=msg_count) as pbar: | |
for msg in func(source=reader, topics=[topic_]): | |
t = msg.publish_time_ns / 1e9 | |
time_dat["time"] = t | |
time_dat[topic_] = decompose_msg(topic_, msg.ros_msg) | |
# # Stretch out time_dat | |
normalized = pd.json_normalize( | |
time_dat, | |
sep="/", | |
record_prefix="/", | |
) | |
normalized = normalized.to_dict(orient="records")[0] | |
# New csv row | |
data_list.append(normalized) | |
# Progress bar | |
pbar.update(1) | |
return data_list | |
def export_to_csv(topic, msg_arr, mcap_path: Path): | |
df = pd.DataFrame(msg_arr) | |
if len(df) > 1: | |
df = df[["time"] + [c for c in df if c != "time"]] | |
# Filter CSV | |
df = df.fillna(method="pad") | |
df = df.dropna() | |
def is_parent(df: pd.DataFrame, name: str): | |
"""Determine if name in df columns is a parent of other columns""" | |
columns = ",".join(df.columns.tolist()) | |
has_children = (name + "/") in columns | |
is_zero = ~(df[name].astype(bool)).all() | |
return has_children and is_zero | |
parent_cols = [c for c in df.columns.tolist() if is_parent(df, c)] | |
df = df.drop(columns=parent_cols) | |
# Create a folder for CSV | |
type_path = mcap_path.parent.resolve() / mcap_path.csv_type | |
if not type_path.exists(): | |
type_path.mkdir(parents=True, exist_ok=True) | |
# Export df to CSV | |
out_filename = f"{mcap_path.stem}_{slugify(topic)}.csv" | |
csv_path = type_path / out_filename | |
print("Saving to", csv_path, "with columns :\n", *df.columns.values) | |
df.to_csv(csv_path, index=False) | |
def main(): | |
mcapfp = parse_arguments() | |
mcapfp.__slots__ = (mcapfp.__slots__) + ("csv_type",) | |
mcapfp.csv_type = mcapfp.stem.rsplit("_", 1)[-1] | |
print(mcapfp.csv_type, mcapfp) | |
topics = get_topics(mcapfp.csv_type) | |
with open(mcapfp, "rb") as f: | |
reader = make_reader(f) | |
for topic in topics: | |
try: | |
messages = get_topic_msgs(topic, reader) | |
export_to_csv(topic, messages, mcapfp) | |
except ValueError as ve: | |
print(ve) | |
finally: | |
continue | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment