Skip to content

Instantly share code, notes, and snippets.

@vrbadev
Last active May 22, 2024 14:37
Show Gist options
  • Save vrbadev/2decb33e909bb1d4e08544e407a67935 to your computer and use it in GitHub Desktop.
Save vrbadev/2decb33e909bb1d4e08544e407a67935 to your computer and use it in GitHub Desktop.
ROS-less python rosbag topic extraction (using rosbags library)
from pathlib import Path
from rosbags.highlevel import AnyReader
from contextlib import nullcontext
from tqdm import tqdm
# aux function to get a list of all rosbag topic names
def rosbag_topics(bag_path):
with AnyReader([Path(bag_path)]) as reader:
return reader.topics
# iterator over selected topic in the rosbag file (optionally with tqdm progress bar)
def rosbag_topic_messages(bag_path, topic_name, use_tqdm=False):
with AnyReader([Path(bag_path)]) as reader:
connections = [c for c in reader.connections if c.topic == topic_name]
with tqdm(total=sum([c.msgcount for c in connections])) if use_tqdm else nullcontext() as pbar:
for i, (c, bag_stamp, rawdata) in enumerate(reader.messages(connections=connections)):
yield i, bag_stamp, reader.deserialize(rawdata, c.msgtype)
if use_tqdm: pbar.update(1)
# example usage: extract IMU topic (msg type sensor_msgs/Imu) to pandas dataframe
import pandas as pd
import numpy as np
if __name__ == "__main__":
columns = ["bag_stamp", "pub_stamp", "orientation", "orientation_covariance", "angular_velocity", "angular_velocity_covariance", "linear_acceleration", "linear_acceleration_covariance"]
rows_list = list() # using lists and dicts is significantly faster than dataframe concatenation
for i, bag_stamp, msg in rosbag_topic_messages("path/to/file.bag", "/uav/mavros/imu/data", use_tqdm=True):
pub_stamp = int(1e9 * msg.header.stamp.sec + msg.header.stamp.nanosec)
orientation_q = np.array([msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w])
angular_velocity_v = np.array([msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z])
linear_acceleration_v = np.array([msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z])
values = [bag_stamp, pub_stamp, orientation_q, msg.orientation_covariance, angular_velocity_v, msg.angular_velocity_covariance, linear_acceleration_v, msg.linear_acceleration_covariance]
rows_list.append(dict(zip(columns, values)))
df_imu = pd.DataFrame(rows_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment