Skip to content

Instantly share code, notes, and snippets.

@Merwanski
Created January 15, 2024 20:58
Show Gist options
  • Save Merwanski/8a3d12a1a0a2a427eff202587ac7fa04 to your computer and use it in GitHub Desktop.
Save Merwanski/8a3d12a1a0a2a427eff202587ac7fa04 to your computer and use it in GitHub Desktop.
ros2bag_data_extractor.py
import os
import cv2
import open3d as o3d
import numpy as np
from sensor_msgs.msg import PointCloud2
from sensor_msgs.msg import CompressedImage
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
import pcl
import time
import ros2_numpy as rnp
class DataExtractor(Node):
def __init__(self):
super().__init__('data_extractor')
self.create_subscription(
CompressedImage,
'/zed2i/zed_node/left/image_rect_color/compressed',
self.image_callback,
QoSProfile(depth=100),
)
self.create_subscription(
PointCloud2,
'/zed2i/zed_node/point_cloud/cloud_registered',
self.point_cloud_callback,
QoSProfile(depth=100),
)
script_directory = os.path.dirname(os.path.abspath(__file__))
self.image_directory = os.path.join(script_directory, 'dataExtracted', 'leftImages')
self.point_cloud_directory = os.path.join(script_directory, 'dataExtracted', 'pointCloud')
# Performance Monitoring
self.last_save_time = time.time()
self.last_image_save_time = time.time()
self.save_warning_threshold = 0.5 # seconds
self.image_save_warning_threshold = 0.5 # seconds
self.get_logger().info('Data Extractor is ready.')
def image_callback(self, msg):
try:
# Extract timestamp from the message
timestamp = msg.header.stamp
timestamp_ms = timestamp.sec * 1000 + timestamp.nanosec / 1e6
# Create the directory if it doesn't exist
os.makedirs(self.image_directory, exist_ok=True)
# Construct the file path
image_filename = os.path.join(self.image_directory, f'{timestamp_ms:.4f}ms.png')
# Skip saving if the file already exists
if os.path.exists(image_filename):
return
image_data = np.frombuffer(msg.data, np.uint8)
image_np = cv2.imdecode(image_data, cv2.IMREAD_UNCHANGED)
start_time = time.time()
cv2.imwrite(image_filename, image_np)
end_time = time.time()
# Print the time taken to save
save_duration = end_time - start_time
self.get_logger().info(f'Saved image: {image_filename}, Time taken: {save_duration:.4f} seconds')
# Performance Monitoring
current_time = time.time()
frame_duration = current_time - self.last_image_save_time
self.last_image_save_time = current_time
# Issue a warning if saving can't keep up with the frame rate
if frame_duration > self.image_save_warning_threshold:
self.get_logger().warning(f'Saving may not keep up with frame rate. Frame duration: {frame_duration:.4f} seconds')
except Exception as e:
self.get_logger().error(f'Error processing image: {str(e)}')
def point_cloud_callback(self, msg):
try:
# Extract timestamp from the message
timestamp = msg.header.stamp
timestamp_ms = timestamp.sec * 1000 + timestamp.nanosec / 1e6
# get point cloud
data = rnp.numpify(msg)
dataXYZ = data['xyz']
# Convert PointCloud2 message to PCL PointCloud
# Pass xyz to Open3D.o3d.geometry.PointCloud and visualize
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(dataXYZ)
# Remove NaN values
point_cloud = pcd
cleaned_points = np.asarray(point_cloud.points)
cleaned_points = cleaned_points[~np.isnan(cleaned_points).any(axis=1)]
point_cloud.points = o3d.utility.Vector3dVector(cleaned_points)
# Visualize the cleaned point cloud
# o3d.visualization.draw_geometries([point_cloud])
# Create the directory if it doesn't exist
os.makedirs(self.point_cloud_directory, exist_ok=True)
# Save the PCL PointCloud as a PCD file
ply_filename = os.path.join(self.point_cloud_directory, f'{timestamp_ms:.4f}ms.ply')
# Skip saving if the file already exists
if os.path.exists(ply_filename):
return
start_time = time.time()
o3d.io.write_point_cloud(ply_filename, pcd)
end_time = time.time()
# Print the time taken to save
save_duration = end_time - start_time
self.get_logger().info(f'Saved point cloud: {ply_filename}, Time taken: {save_duration:.4f} seconds')
# Performance Monitoring
current_time = time.time()
frame_duration = current_time - self.last_save_time
self.last_save_time = current_time
# Issue a warning if saving can't keep up with the frame rate
if frame_duration > self.save_warning_threshold:
self.get_logger().warning(f'Saving may not keep up with frame rate. Frame duration: {frame_duration:.4f} seconds')
except Exception as e:
self.get_logger().error(f'Error processing point cloud: {str(e)}')
def main():
rclpy.init()
data_extractor = DataExtractor()
rclpy.spin(data_extractor)
rclpy.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment