|
#!/usr/bin/env python |
|
|
|
# Software License Agreement (BSD License) |
|
# |
|
# Copyright (c) 2008, Willow Garage, Inc. |
|
# All rights reserved. |
|
# |
|
# Redistribution and use in source and binary forms, with or without |
|
# modification, are permitted provided that the following conditions |
|
# are met: |
|
# |
|
# * Redistributions of source code must retain the above copyright |
|
# notice, this list of conditions and the following disclaimer. |
|
# * Redistributions in binary form must reproduce the above |
|
# copyright notice, this list of conditions and the following |
|
# disclaimer in the documentation and/or other materials provided |
|
# with the distribution. |
|
# * Neither the name of Willow Garage, Inc. nor the names of its |
|
# contributors may be used to endorse or promote products derived |
|
# from this software without specific prior written permission. |
|
# |
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS |
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE |
|
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, |
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, |
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
# POSSIBILITY OF SUCH DAMAGE. |
|
|
|
""" |
|
Serialization of sensor_msgs.PointCloud2 messages. |
|
|
|
Author: Tim Field |
|
ROS2 port by Sebastian Grans (2020) |
|
""" |
|
|
|
import sys |
|
|
|
from collections import namedtuple |
|
import ctypes |
|
import math |
|
import struct |
|
|
|
from sensor_msgs.msg import PointCloud2, PointField |
|
|
|
_DATATYPES = {} |
|
_DATATYPES[PointField.INT8] = ('b', 1) |
|
_DATATYPES[PointField.UINT8] = ('B', 1) |
|
_DATATYPES[PointField.INT16] = ('h', 2) |
|
_DATATYPES[PointField.UINT16] = ('H', 2) |
|
_DATATYPES[PointField.INT32] = ('i', 4) |
|
_DATATYPES[PointField.UINT32] = ('I', 4) |
|
_DATATYPES[PointField.FLOAT32] = ('f', 4) |
|
_DATATYPES[PointField.FLOAT64] = ('d', 8) |
|
|
|
def read_points(cloud, field_names=None, skip_nans=False, uvs=[]): |
|
""" |
|
Read points from a L{sensor_msgs.PointCloud2} message. |
|
|
|
@param cloud: The point cloud to read from. |
|
@type cloud: L{sensor_msgs.PointCloud2} |
|
@param field_names: The names of fields to read. If None, read all fields. [default: None] |
|
@type field_names: iterable |
|
@param skip_nans: If True, then don't return any point with a NaN value. |
|
@type skip_nans: bool [default: False] |
|
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list] |
|
@type uvs: iterable |
|
@return: Generator which yields a list of values for each point. |
|
@rtype: generator |
|
""" |
|
assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2' |
|
fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names) |
|
width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, \ |
|
cloud.point_step, cloud.row_step, \ |
|
cloud.data, math.isnan |
|
unpack_from = struct.Struct(fmt).unpack_from |
|
|
|
if skip_nans: |
|
if uvs: |
|
for u, v in uvs: |
|
p = unpack_from(data, (row_step * v) + (point_step * u)) |
|
has_nan = False |
|
for pv in p: |
|
if isnan(pv): |
|
has_nan = True |
|
break |
|
if not has_nan: |
|
yield p |
|
else: |
|
for v in range(height): |
|
offset = row_step * v |
|
for u in range(width): |
|
p = unpack_from(data, offset) |
|
has_nan = False |
|
for pv in p: |
|
if isnan(pv): |
|
has_nan = True |
|
break |
|
if not has_nan: |
|
yield p |
|
offset += point_step |
|
else: |
|
if uvs: |
|
for u, v in uvs: |
|
yield unpack_from(data, (row_step * v) + (point_step * u)) |
|
else: |
|
for v in range(height): |
|
offset = row_step * v |
|
for u in range(width): |
|
yield unpack_from(data, offset) |
|
offset += point_step |
|
|
|
def read_points_list(cloud, field_names=None, skip_nans=False, uvs=[]): |
|
""" |
|
Read points from a L{sensor_msgs.PointCloud2} message. |
|
|
|
This function returns a list of namedtuples. It operates on top of the read_points method. For more efficient access use read_points directly. |
|
|
|
@param cloud: The point cloud to read from. |
|
@type cloud: L{sensor_msgs.PointCloud2} |
|
@param field_names: The names of fields to read. If None, read all fields. [default: None] |
|
@type field_names: iterable |
|
@param skip_nans: If True, then don't return any point with a NaN value. |
|
@type skip_nans: bool [default: False] |
|
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list] |
|
@type uvs: iterable |
|
@return: List of namedtuples containing the values for each point |
|
@rtype: list |
|
""" |
|
assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2' |
|
|
|
if field_names is None: |
|
field_names = [f.name for f in cloud.fields] |
|
|
|
Point = namedtuple("Point", field_names) |
|
|
|
return [Point._make(l) for l in read_points(cloud, field_names, skip_nans, uvs)] |
|
|
|
def create_cloud(header, fields, points): |
|
""" |
|
Create a L{sensor_msgs.msg.PointCloud2} message. |
|
|
|
@param header: The point cloud header. |
|
@type header: L{std_msgs.msg.Header} |
|
@param fields: The point cloud fields. |
|
@type fields: iterable of L{sensor_msgs.msg.PointField} |
|
@param points: The point cloud points. |
|
@type points: list of iterables, i.e. one iterable for each point, with the |
|
elements of each iterable being the values of the fields for |
|
that point (in the same order as the fields parameter) |
|
@return: The point cloud. |
|
@rtype: L{sensor_msgs.msg.PointCloud2} |
|
""" |
|
cloud_struct = struct.Struct(_get_struct_fmt(False, fields)) |
|
|
|
buff = ctypes.create_string_buffer(cloud_struct.size * len(points)) |
|
|
|
point_step, pack_into = cloud_struct.size, cloud_struct.pack_into |
|
offset = 0 |
|
for p in points: |
|
pack_into(buff, offset, *p) |
|
offset += point_step |
|
|
|
return PointCloud2(header=header, |
|
height=1, |
|
width=len(points), |
|
is_dense=False, |
|
is_bigendian=False, |
|
fields=fields, |
|
point_step=cloud_struct.size, |
|
row_step=cloud_struct.size * len(points), |
|
data=buff.raw) |
|
|
|
def create_cloud_xyz32(header, points): |
|
""" |
|
Create a L{sensor_msgs.msg.PointCloud2} message with 3 float32 fields (x, y, z). |
|
|
|
@param header: The point cloud header. |
|
@type header: L{std_msgs.msg.Header} |
|
@param points: The point cloud points. |
|
@type points: iterable |
|
@return: The point cloud. |
|
@rtype: L{sensor_msgs.msg.PointCloud2} |
|
""" |
|
fields = [PointField('x', 0, PointField.FLOAT32, 1), |
|
PointField('y', 4, PointField.FLOAT32, 1), |
|
PointField('z', 8, PointField.FLOAT32, 1)] |
|
return create_cloud(header, fields, points) |
|
|
|
def _get_struct_fmt(is_bigendian, fields, field_names=None): |
|
fmt = '>' if is_bigendian else '<' |
|
offset = 0 |
|
for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names): |
|
if offset < field.offset: |
|
fmt += 'x' * (field.offset - offset) |
|
offset = field.offset |
|
if field.datatype not in _DATATYPES: |
|
print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr) |
|
else: |
|
datatype_fmt, datatype_length = _DATATYPES[field.datatype] |
|
fmt += field.count * datatype_fmt |
|
offset += field.count * datatype_length |
|
|
|
return fmt |