Created
January 15, 2024 20:58
-
-
Save Merwanski/8a3d12a1a0a2a427eff202587ac7fa04 to your computer and use it in GitHub Desktop.
ros2bag_data_extractor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import cv2 | |
import open3d as o3d | |
import numpy as np | |
from sensor_msgs.msg import PointCloud2 | |
from sensor_msgs.msg import CompressedImage | |
import rclpy | |
from rclpy.node import Node | |
from rclpy.qos import QoSProfile | |
import pcl | |
import time | |
import ros2_numpy as rnp | |
class DataExtractor(Node): | |
def __init__(self): | |
super().__init__('data_extractor') | |
self.create_subscription( | |
CompressedImage, | |
'/zed2i/zed_node/left/image_rect_color/compressed', | |
self.image_callback, | |
QoSProfile(depth=100), | |
) | |
self.create_subscription( | |
PointCloud2, | |
'/zed2i/zed_node/point_cloud/cloud_registered', | |
self.point_cloud_callback, | |
QoSProfile(depth=100), | |
) | |
script_directory = os.path.dirname(os.path.abspath(__file__)) | |
self.image_directory = os.path.join(script_directory, 'dataExtracted', 'leftImages') | |
self.point_cloud_directory = os.path.join(script_directory, 'dataExtracted', 'pointCloud') | |
# Performance Monitoring | |
self.last_save_time = time.time() | |
self.last_image_save_time = time.time() | |
self.save_warning_threshold = 0.5 # seconds | |
self.image_save_warning_threshold = 0.5 # seconds | |
self.get_logger().info('Data Extractor is ready.') | |
def image_callback(self, msg): | |
try: | |
# Extract timestamp from the message | |
timestamp = msg.header.stamp | |
timestamp_ms = timestamp.sec * 1000 + timestamp.nanosec / 1e6 | |
# Create the directory if it doesn't exist | |
os.makedirs(self.image_directory, exist_ok=True) | |
# Construct the file path | |
image_filename = os.path.join(self.image_directory, f'{timestamp_ms:.4f}ms.png') | |
# Skip saving if the file already exists | |
if os.path.exists(image_filename): | |
return | |
image_data = np.frombuffer(msg.data, np.uint8) | |
image_np = cv2.imdecode(image_data, cv2.IMREAD_UNCHANGED) | |
start_time = time.time() | |
cv2.imwrite(image_filename, image_np) | |
end_time = time.time() | |
# Print the time taken to save | |
save_duration = end_time - start_time | |
self.get_logger().info(f'Saved image: {image_filename}, Time taken: {save_duration:.4f} seconds') | |
# Performance Monitoring | |
current_time = time.time() | |
frame_duration = current_time - self.last_image_save_time | |
self.last_image_save_time = current_time | |
# Issue a warning if saving can't keep up with the frame rate | |
if frame_duration > self.image_save_warning_threshold: | |
self.get_logger().warning(f'Saving may not keep up with frame rate. Frame duration: {frame_duration:.4f} seconds') | |
except Exception as e: | |
self.get_logger().error(f'Error processing image: {str(e)}') | |
def point_cloud_callback(self, msg): | |
try: | |
# Extract timestamp from the message | |
timestamp = msg.header.stamp | |
timestamp_ms = timestamp.sec * 1000 + timestamp.nanosec / 1e6 | |
# get point cloud | |
data = rnp.numpify(msg) | |
dataXYZ = data['xyz'] | |
# Convert PointCloud2 message to PCL PointCloud | |
# Pass xyz to Open3D.o3d.geometry.PointCloud and visualize | |
pcd = o3d.geometry.PointCloud() | |
pcd.points = o3d.utility.Vector3dVector(dataXYZ) | |
# Remove NaN values | |
point_cloud = pcd | |
cleaned_points = np.asarray(point_cloud.points) | |
cleaned_points = cleaned_points[~np.isnan(cleaned_points).any(axis=1)] | |
point_cloud.points = o3d.utility.Vector3dVector(cleaned_points) | |
# Visualize the cleaned point cloud | |
# o3d.visualization.draw_geometries([point_cloud]) | |
# Create the directory if it doesn't exist | |
os.makedirs(self.point_cloud_directory, exist_ok=True) | |
# Save the PCL PointCloud as a PCD file | |
ply_filename = os.path.join(self.point_cloud_directory, f'{timestamp_ms:.4f}ms.ply') | |
# Skip saving if the file already exists | |
if os.path.exists(ply_filename): | |
return | |
start_time = time.time() | |
o3d.io.write_point_cloud(ply_filename, pcd) | |
end_time = time.time() | |
# Print the time taken to save | |
save_duration = end_time - start_time | |
self.get_logger().info(f'Saved point cloud: {ply_filename}, Time taken: {save_duration:.4f} seconds') | |
# Performance Monitoring | |
current_time = time.time() | |
frame_duration = current_time - self.last_save_time | |
self.last_save_time = current_time | |
# Issue a warning if saving can't keep up with the frame rate | |
if frame_duration > self.save_warning_threshold: | |
self.get_logger().warning(f'Saving may not keep up with frame rate. Frame duration: {frame_duration:.4f} seconds') | |
except Exception as e: | |
self.get_logger().error(f'Error processing point cloud: {str(e)}') | |
def main(): | |
rclpy.init() | |
data_extractor = DataExtractor() | |
rclpy.spin(data_extractor) | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment