Last active
May 22, 2024 14:37
-
-
Save vrbadev/2decb33e909bb1d4e08544e407a67935 to your computer and use it in GitHub Desktop.
ROS-less python rosbag topic extraction (using rosbags library)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from rosbags.highlevel import AnyReader | |
from contextlib import nullcontext | |
from tqdm import tqdm | |
# aux function to get a list of all rosbag topic names | |
def rosbag_topics(bag_path): | |
with AnyReader([Path(bag_path)]) as reader: | |
return reader.topics | |
# iterator over selected topic in the rosbag file (optionally with tqdm progress bar) | |
def rosbag_topic_messages(bag_path, topic_name, use_tqdm=False): | |
with AnyReader([Path(bag_path)]) as reader: | |
connections = [c for c in reader.connections if c.topic == topic_name] | |
with tqdm(total=sum([c.msgcount for c in connections])) if use_tqdm else nullcontext() as pbar: | |
for i, (c, bag_stamp, rawdata) in enumerate(reader.messages(connections=connections)): | |
yield i, bag_stamp, reader.deserialize(rawdata, c.msgtype) | |
if use_tqdm: pbar.update(1) | |
# example usage: extract IMU topic (msg type sensor_msgs/Imu) to pandas dataframe | |
import pandas as pd | |
import numpy as np | |
if __name__ == "__main__": | |
columns = ["bag_stamp", "pub_stamp", "orientation", "orientation_covariance", "angular_velocity", "angular_velocity_covariance", "linear_acceleration", "linear_acceleration_covariance"] | |
rows_list = list() # using lists and dicts is significantly faster than dataframe concatenation | |
for i, bag_stamp, msg in rosbag_topic_messages("path/to/file.bag", "/uav/mavros/imu/data", use_tqdm=True): | |
pub_stamp = int(1e9 * msg.header.stamp.sec + msg.header.stamp.nanosec) | |
orientation_q = np.array([msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w]) | |
angular_velocity_v = np.array([msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z]) | |
linear_acceleration_v = np.array([msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z]) | |
values = [bag_stamp, pub_stamp, orientation_q, msg.orientation_covariance, angular_velocity_v, msg.angular_velocity_covariance, linear_acceleration_v, msg.linear_acceleration_covariance] | |
rows_list.append(dict(zip(columns, values))) | |
df_imu = pd.DataFrame(rows_list) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment