Created
June 16, 2023 12:52
-
-
Save horverno/5f2bab8449a8aadb583cabb8d71298a7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reading from a MCAP file | |
# sudo apt install ros-$ROS_DISTRO-ros-base ros-$ROS_DISTRO-ros2bag ros-$ROS_DISTRO-rosbag2-transport ros-$ROS_DISTRO-rosbag2-storage-mcap | |
from mcap import __version__ | |
print(__version__) | |
"""script that reads ROS2 messages from an MCAP bag using the rosbag2_py API.""" | |
import argparse | |
from rclpy.serialization import deserialize_message | |
from rosidl_runtime_py.utilities import get_message | |
import rosbag2_py | |
def read_messages(input_bag: str): | |
reader = rosbag2_py.SequentialReader() | |
reader.open( | |
rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"), | |
rosbag2_py.ConverterOptions( | |
input_serialization_format="cdr", output_serialization_format="cdr" | |
), | |
) | |
topic_types = reader.get_all_topics_and_types() | |
def typename(topic_name): | |
for topic_type in topic_types: | |
if topic_type.name == topic_name: | |
return topic_type.type | |
raise ValueError(f"topic {topic_name} not in bag") | |
while reader.has_next(): | |
topic, data, timestamp = reader.read_next() | |
msg_type = get_message(typename(topic)) | |
msg = deserialize_message(data, msg_type) | |
yield topic, msg, timestamp | |
del reader | |
def main(): | |
inpu = "/mnt/c/bag/gyor_ros2_lexus_gps/gyor_pelda_odom.mcap" | |
i = 0 | |
first_run = True | |
for topic, msg, timestamp in read_messages(inpu): | |
if(first_run): | |
timestamp_start = timestamp | |
first_run = False | |
if (i % 1000 == 0): | |
if(topic == "/lexus3/vehicle_steering"): | |
print("steer: %.2f (%.4f)" % (msg.data, (timestamp-timestamp_start) / 1000000000)) | |
if(topic == "/lexus3/gps/duro/imu"): | |
print("imu: %.2f %.2f (%.4f)" % (msg.angular_velocity.x, msg.angular_velocity.y, (timestamp-timestamp_start) / 1000000000)) | |
#print("%s <%s>" % (topic, type(msg))) | |
i += 1 | |
if __name__ == "__main__": | |
main() | |
Author
horverno
commented
Sep 29, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment