Skip to content

Instantly share code, notes, and snippets.

@michalpelka
Created October 14, 2024 14:29
Show Gist options
  • Save michalpelka/93155e3b576d1e37dab9c4c8171750fe to your computer and use it in GitHub Desktop.
Save michalpelka/93155e3b576d1e37dab9c4c8171750fe to your computer and use it in GitHub Desktop.
import os
import rosbag2_py
import numpy as np
import struct
from sensor_msgs.msg import PointCloud2
from sensor_msgs_py import point_cloud2
from rclpy.serialization import deserialize_message
def read_bag_convert_to_csv(bag_file, topic, output_csv):
# Open ROS 2 bag
storage_options = rosbag2_py.StorageOptions(uri=bag_file, storage_id='sqlite3')
converter_options = rosbag2_py.ConverterOptions(input_serialization_format='cdr', output_serialization_format='cdr')
reader = rosbag2_py.SequentialReader()
reader.open(storage_options, converter_options)
# Get list of topics and types
topic_types = reader.get_all_topics_and_types()
topic_type_map = {t.name: t.type for t in topic_types}
print (f"Topics in bag: {topic_type_map}")
print (f"Reading from topic: {topic}")
# Open output CSV file
with open(output_csv, 'w') as csv_file:
# Write header
csv_file.write("x,y,z\n")
# Iterate through messages
while reader.has_next():
(topic_name, data, t) = reader.read_next()
if topic_name == topic:
pointcloud_msg = deserialize_message(data, PointCloud2)
# Convert PointCloud2 to an iterable format
point_cloud = point_cloud2.read_points(pointcloud_msg, field_names=("x", "y", "z"), skip_nans=True)
# Write each point to CSV
for point in point_cloud:
x, y, z = point
csv_file.write(f"{x},{y},{z}\n")
print(f"Data written to {output_csv}")
# Usage
if __name__ == "__main__":
bag_file = '/home/michalpelka/Desktop/rosbag2_2024_10_14-15_57_03/'
topic = '/camera/camera/depth/color/points'
output_csv = 'output.csv'
read_bag_convert_to_csv(bag_file, topic, output_csv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment