-
-
Save SebastianGrans/6ae5cab66e453a14a859b66cd9579239 to your computer and use it in GitHub Desktop.
import sys | |
import os | |
import rclpy | |
from rclpy.node import Node | |
import sensor_msgs.msg as sensor_msgs | |
import numpy as np | |
import open3d as o3d | |
class PCDListener(Node): | |
def __init__(self): | |
super().__init__('pcd_subsriber_node') | |
## This is for visualization of the received point cloud. | |
self.vis = o3d.visualization.Visualizer() | |
self.vis.create_window() | |
self.o3d_pcd = o3d.geometry.PointCloud() | |
# Set up a subscription to the 'pcd' topic with a callback to the | |
# function `listener_callback` | |
self.pcd_subscriber = self.create_subscription( | |
sensor_msgs.PointCloud2, # Msg type | |
'pcd', # topic | |
self.listener_callback, # Function to call | |
10 # QoS | |
) | |
def listener_callback(self, msg): | |
# Here we convert the 'msg', which is of the type PointCloud2. | |
# I ported the function read_points2 from | |
# the ROS1 package. | |
# https://github.com/ros/common_msgs/blob/noetic-devel/sensor_msgs/src/sensor_msgs/point_cloud2.py | |
pcd_as_numpy_array = np.array(list(read_points(msg))) | |
# The rest here is for visualization. | |
self.vis.remove_geometry(self.o3d_pcd) | |
self.o3d_pcd = o3d.geometry.PointCloud( | |
o3d.utility.Vector3dVector(pcd_as_numpy_array)) | |
self.vis.add_geometry(self.o3d_pcd) | |
self.vis.poll_events() | |
self.vis.update_renderer() | |
## The code below is "ported" from | |
# https://github.com/ros/common_msgs/tree/noetic-devel/sensor_msgs/src/sensor_msgs | |
# I'll make an official port and PR to this repo later: | |
# https://github.com/ros2/common_interfaces | |
import sys | |
from collections import namedtuple | |
import ctypes | |
import math | |
import struct | |
from sensor_msgs.msg import PointCloud2, PointField | |
_DATATYPES = {} | |
_DATATYPES[PointField.INT8] = ('b', 1) | |
_DATATYPES[PointField.UINT8] = ('B', 1) | |
_DATATYPES[PointField.INT16] = ('h', 2) | |
_DATATYPES[PointField.UINT16] = ('H', 2) | |
_DATATYPES[PointField.INT32] = ('i', 4) | |
_DATATYPES[PointField.UINT32] = ('I', 4) | |
_DATATYPES[PointField.FLOAT32] = ('f', 4) | |
_DATATYPES[PointField.FLOAT64] = ('d', 8) | |
def read_points(cloud, field_names=None, skip_nans=False, uvs=[]): | |
""" | |
Read points from a L{sensor_msgs.PointCloud2} message. | |
@param cloud: The point cloud to read from. | |
@type cloud: L{sensor_msgs.PointCloud2} | |
@param field_names: The names of fields to read. If None, read all fields. [default: None] | |
@type field_names: iterable | |
@param skip_nans: If True, then don't return any point with a NaN value. | |
@type skip_nans: bool [default: False] | |
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list] | |
@type uvs: iterable | |
@return: Generator which yields a list of values for each point. | |
@rtype: generator | |
""" | |
assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2' | |
fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names) | |
width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan | |
unpack_from = struct.Struct(fmt).unpack_from | |
if skip_nans: | |
if uvs: | |
for u, v in uvs: | |
p = unpack_from(data, (row_step * v) + (point_step * u)) | |
has_nan = False | |
for pv in p: | |
if isnan(pv): | |
has_nan = True | |
break | |
if not has_nan: | |
yield p | |
else: | |
for v in range(height): | |
offset = row_step * v | |
for u in range(width): | |
p = unpack_from(data, offset) | |
has_nan = False | |
for pv in p: | |
if isnan(pv): | |
has_nan = True | |
break | |
if not has_nan: | |
yield p | |
offset += point_step | |
else: | |
if uvs: | |
for u, v in uvs: | |
yield unpack_from(data, (row_step * v) + (point_step * u)) | |
else: | |
for v in range(height): | |
offset = row_step * v | |
for u in range(width): | |
yield unpack_from(data, offset) | |
offset += point_step | |
def _get_struct_fmt(is_bigendian, fields, field_names=None): | |
fmt = '>' if is_bigendian else '<' | |
offset = 0 | |
for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names): | |
if offset < field.offset: | |
fmt += 'x' * (field.offset - offset) | |
offset = field.offset | |
if field.datatype not in _DATATYPES: | |
print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr) | |
else: | |
datatype_fmt, datatype_length = _DATATYPES[field.datatype] | |
fmt += field.count * datatype_fmt | |
offset += field.count * datatype_length | |
return fmt | |
def main(args=None): | |
# Boilerplate code. | |
rclpy.init(args=args) | |
pcd_listener = PCDListener() | |
rclpy.spin(pcd_listener) | |
# Destroy the node explicitly | |
# (optional - otherwise it will be done automatically | |
# when the garbage collector destroys the node object) | |
pcd_listener.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Hi!
I haven't worked with ROS in a while, but maybe my demo package can help you figure out how to use PointCloud2?
You can find it here: https://github.com/SebastianGrans/ROS2-Point-Cloud-Demo
Wow, it's very detailed and well made, thank you!!
it really helped me.
I've got a very big point cloud and the o3d.utility.Vector3dVector
line is throwing a runtime error without any description when using your sample above.
The following code works though. Sample from https://answers.ros.org/question/255351/how-o-save-a-pointcloud2-data-in-python/
gen = read_points(msg, skip_nans=True)
int_data = list(gen)
xyz = np.empty((len(int_data), 3))
rgb = np.empty((len(int_data), 3))
idx = 0
for x in int_data:
test = x[3]
# cast float32 to int so that bitwise operations are possible
s = struct.pack('>f' ,test)
i = struct.unpack('>l',s)[0]
# you can get back the float value by the inverse operations
pack = ctypes.c_uint32(i).value
r = (pack & 0x00FF0000)>> 16
g = (pack & 0x0000FF00)>> 8
b = (pack & 0x000000FF)
# prints r,g,b values in the 0-255 range
# x,y,z can be retrieved from the x[0],x[1],x[2]
# xyz = np.append(xyz,[[x[0],x[1],x[2]]], axis = 0)
# rgb = np.append(rgb,[[r,g,b]], axis = 0)
xyz[idx] = [x[0],x[1],x[2]]
rgb[idx] = [r,g,b]
idx = idx + 1
out_pcd = o3d.geometry.PointCloud()
out_pcd.points = o3d.utility.Vector3dVector(xyz)
out_pcd.colors = o3d.utility.Vector3dVector(rgb)
Any idea why the numpy_array returned from your code sample would not work?
I solved the error changing this line in read_points()
yield unpack_from(data, offset)[:3]
It because this line was returning lists of 4 positions and o3d.utility.Vector3dVector
works with lists of 3 positions.
Note: I don't know for what the 4 position value is for. Maybe it is the point color packed in float.
Thanks for the solution!
Hey! thanks for sharing your code here!
Do you think you can help me with something similar to this code right here?
I want to generate an ROS file from existing pointcloud, I use rosbag to create the ros file, but I'm not familiar with PointCloud2.
So my question is how can I create a PointCloud2 instance with demo data? only if you know and have time to help, would greatly appreciate it.