Skip to content

Instantly share code, notes, and snippets.

@SebastianGrans
Last active March 28, 2023 10:33
Show Gist options
  • Save SebastianGrans/dc301fa507910a4f079a0016bbcbd10a to your computer and use it in GitHub Desktop.
Save SebastianGrans/dc301fa507910a4f079a0016bbcbd10a to your computer and use it in GitHub Desktop.
ROS2 port of the `point_cloud2.py` script from ROS1.

In ROS1 there was a script called point_cloud2.py that was accessible as a module in the sensor_msgs package. I.e. by running from sensor_msgs import point_cloud2. It provided various functions for creating PointCloud2 messages from lists, and vice versa.

I have "ported" it to ROS2 and submitted a PR, but it is currently under review: ros2/common_interfaces#128

I've decided to also place it here to make it more visible and accessible (if the google search algorithms work).

#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Serialization of sensor_msgs.PointCloud2 messages.
Author: Tim Field
ROS2 port by Sebastian Grans (2020)
"""
import sys
from collections import namedtuple
import ctypes
import math
import struct
from sensor_msgs.msg import PointCloud2, PointField
_DATATYPES = {}
_DATATYPES[PointField.INT8] = ('b', 1)
_DATATYPES[PointField.UINT8] = ('B', 1)
_DATATYPES[PointField.INT16] = ('h', 2)
_DATATYPES[PointField.UINT16] = ('H', 2)
_DATATYPES[PointField.INT32] = ('i', 4)
_DATATYPES[PointField.UINT32] = ('I', 4)
_DATATYPES[PointField.FLOAT32] = ('f', 4)
_DATATYPES[PointField.FLOAT64] = ('d', 8)
def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
"""
Read points from a L{sensor_msgs.PointCloud2} message.
@param cloud: The point cloud to read from.
@type cloud: L{sensor_msgs.PointCloud2}
@param field_names: The names of fields to read. If None, read all fields. [default: None]
@type field_names: iterable
@param skip_nans: If True, then don't return any point with a NaN value.
@type skip_nans: bool [default: False]
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
@type uvs: iterable
@return: Generator which yields a list of values for each point.
@rtype: generator
"""
assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2'
fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names)
width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, \
cloud.point_step, cloud.row_step, \
cloud.data, math.isnan
unpack_from = struct.Struct(fmt).unpack_from
if skip_nans:
if uvs:
for u, v in uvs:
p = unpack_from(data, (row_step * v) + (point_step * u))
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
else:
for v in range(height):
offset = row_step * v
for u in range(width):
p = unpack_from(data, offset)
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
offset += point_step
else:
if uvs:
for u, v in uvs:
yield unpack_from(data, (row_step * v) + (point_step * u))
else:
for v in range(height):
offset = row_step * v
for u in range(width):
yield unpack_from(data, offset)
offset += point_step
def read_points_list(cloud, field_names=None, skip_nans=False, uvs=[]):
"""
Read points from a L{sensor_msgs.PointCloud2} message.
This function returns a list of namedtuples. It operates on top of the read_points method. For more efficient access use read_points directly.
@param cloud: The point cloud to read from.
@type cloud: L{sensor_msgs.PointCloud2}
@param field_names: The names of fields to read. If None, read all fields. [default: None]
@type field_names: iterable
@param skip_nans: If True, then don't return any point with a NaN value.
@type skip_nans: bool [default: False]
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
@type uvs: iterable
@return: List of namedtuples containing the values for each point
@rtype: list
"""
assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2'
if field_names is None:
field_names = [f.name for f in cloud.fields]
Point = namedtuple("Point", field_names)
return [Point._make(l) for l in read_points(cloud, field_names, skip_nans, uvs)]
def create_cloud(header, fields, points):
"""
Create a L{sensor_msgs.msg.PointCloud2} message.
@param header: The point cloud header.
@type header: L{std_msgs.msg.Header}
@param fields: The point cloud fields.
@type fields: iterable of L{sensor_msgs.msg.PointField}
@param points: The point cloud points.
@type points: list of iterables, i.e. one iterable for each point, with the
elements of each iterable being the values of the fields for
that point (in the same order as the fields parameter)
@return: The point cloud.
@rtype: L{sensor_msgs.msg.PointCloud2}
"""
cloud_struct = struct.Struct(_get_struct_fmt(False, fields))
buff = ctypes.create_string_buffer(cloud_struct.size * len(points))
point_step, pack_into = cloud_struct.size, cloud_struct.pack_into
offset = 0
for p in points:
pack_into(buff, offset, *p)
offset += point_step
return PointCloud2(header=header,
height=1,
width=len(points),
is_dense=False,
is_bigendian=False,
fields=fields,
point_step=cloud_struct.size,
row_step=cloud_struct.size * len(points),
data=buff.raw)
def create_cloud_xyz32(header, points):
"""
Create a L{sensor_msgs.msg.PointCloud2} message with 3 float32 fields (x, y, z).
@param header: The point cloud header.
@type header: L{std_msgs.msg.Header}
@param points: The point cloud points.
@type points: iterable
@return: The point cloud.
@rtype: L{sensor_msgs.msg.PointCloud2}
"""
fields = [PointField('x', 0, PointField.FLOAT32, 1),
PointField('y', 4, PointField.FLOAT32, 1),
PointField('z', 8, PointField.FLOAT32, 1)]
return create_cloud(header, fields, points)
def _get_struct_fmt(is_bigendian, fields, field_names=None):
fmt = '>' if is_bigendian else '<'
offset = 0
for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names):
if offset < field.offset:
fmt += 'x' * (field.offset - offset)
offset = field.offset
if field.datatype not in _DATATYPES:
print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr)
else:
datatype_fmt, datatype_length = _DATATYPES[field.datatype]
fmt += field.count * datatype_fmt
offset += field.count * datatype_length
return fmt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment