Skip to content

Instantly share code, notes, and snippets.

@kor01
Last active August 30, 2017 00:16
Show Gist options
  • Save kor01/209143290779e6cae57175d4966bef3d to your computer and use it in GitHub Desktop.
Save kor01/209143290779e6cae57175d4966bef3d to your computer and use it in GitHub Desktop.
[RoboND_segmentation] segmentation clustering #robotics
DBSCAN: Density-Based Spatial Clustering of Application with Noise
Assummes: known distance boundary but not number of clusters
Also named as: Euclidean Clustering
two hyper parameters:
maximum distance within cluster
minimum number of points in a cluster
algorithm:
a point p is a core point if there are minPts within it's epsilon-neighbour
every point within its neighbour is said to be directly reachable from p
q is reachable from p iff there is a path p p1 ... pn q such that: every pi+1 is directly reachable from pi
points not reachable from any other points are outliers
indications:
no points are reachable from non-core point regardless of distance
two points p and q are density-connected if there is a point o such that both p and q are density-reachable from o
the connectness is symmetric
a cluster:
all points within the cluster are mutually density-conncted
a point is density reachable from any point of cluster is part of the cluster
the cluster identity of core point is unique, for non-core-point it is ambiguous
# create a node subscribe a point cloud
# publish the segmented point could
import rospy
import sensor_msgs.point_cloud2 as pc2
import pcl_helper as ph
import pcl_processor as pp
def pcl_callback(obj_pub, table_pub, msg):
# convert message to pcl
data = ph.ros_to_pcl(msg)
# downsample pcl
data = pp.downsample_pcl(data)
# crop segmentation
workspace = pp.workspace_filter(data)
# RANSAC table
table, objects = pp.segment_table_objects(workspace)
# clustering objects
clusters = pp.cluster_objects(objects)
# mark each object with distinct color
objects = pp.visualize_objects(objects, clusters)
# convert back to ros message and publish
obj_pub.publish(ph.pcl_to_ros(objects))
table_pub.publish(ph.pcl_to_ros(table))
def main():
rospy.init_node('clustering', anonymous=True)
objects_pub = rospy.Publisher('/pcl_objects', pc2.PointCloud2, queue_size=1)
table_pub = rospy.Publisher('/pcl_table', pc2.PointCloud2, queue_size=1)
_ = rospy.Subscriber(
'/sensor_stick/point_cloud',
pc2.PointCloud2, lambda x: pcl_callback(
objects_pub, table_pub, x), queue_size=1)
rospy.spin()
if __name__ == '__main__':
main()
# convert ros message to pcl library
import sensor_msgs.point_cloud2 as pc2
def ros_to_pcl(ros_cloud):
""" Converts a ROS PointCloud2 message to a pcl PointXYZRGB
Args:
ros_cloud (PointCloud2): ROS PointCloud2 message
Returns:
pcl.PointCloud_PointXYZRGB: PCL XYZRGB point cloud
"""
points_list = []
for data in pc2.read_points(ros_cloud, skip_nans=True):
points_list.append([data[0], data[1], data[2], data[3]])
pcl_data = pcl.PointCloud_PointXYZRGB()
pcl_data.from_list(points_list)
return pcl_data
# downsample pcl
def downsample_pcl(data):
vox = data.make_voxel_grid_filter()
vox.set_leaf_size(LEAVE_SIZE, LEAVE_SIZE, LEAVE_SIZE)
return vox.filter()
# crop pcl
def downsample_pcl(data):
vox = data.make_voxel_grid_filter()
vox.set_leaf_size(LEAVE_SIZE, LEAVE_SIZE, LEAVE_SIZE)
return vox.filter()
def downsample_pcl(data):
vox = data.make_voxel_grid_filter()
vox.set_leaf_size(LEAVE_SIZE, LEAVE_SIZE, LEAVE_SIZE)
return vox.filter()
def cluster_objects(objects):
objects = ph.XYZRGB_to_XYZ(objects)
tree = objects.make_kdtree()
ec = objects.make_EuclideanClusterExtraction()
ec.set_ClusterTolerance(0.05)
ec.set_MinClusterSize(10)
ec.set_MaxClusterSize(1000)
ec.set_SearchMethod(tree)
ret = ec.Extract()
return ret
def cluster_objects(objects):
objects = ph.XYZRGB_to_XYZ(objects)
tree = objects.make_kdtree()
ec = objects.make_EuclideanClusterExtraction()
ec.set_ClusterTolerance(0.05)
ec.set_MinClusterSize(10)
ec.set_MaxClusterSize(1000)
ec.set_SearchMethod(tree)
ret = ec.Extract()
return ret
from sensor_msgs.msg import PointCloud2, PointField
def pcl_to_ros(pcl_array):
""" Converts a ROS PointCloud2 message to a pcl PointXYZRGB
Args:
pcl_array (PointCloud_PointXYZRGB): A PCL XYZRGB point cloud
Returns:
PointCloud2: A ROS point cloud
"""
ros_msg = PointCloud2()
ros_msg.header.stamp = rospy.Time.now()
ros_msg.header.frame_id = "world"
ros_msg.height = 1
ros_msg.width = pcl_array.size
ros_msg.fields.append(PointField(
name="x",
offset=0,
datatype=PointField.FLOAT32, count=1))
ros_msg.fields.append(PointField(
name="y",
offset=4,
datatype=PointField.FLOAT32, count=1))
ros_msg.fields.append(PointField(
name="z",
offset=8,
datatype=PointField.FLOAT32, count=1))
ros_msg.fields.append(PointField(
name="rgb",
offset=16,
datatype=PointField.FLOAT32, count=1))
ros_msg.is_bigendian = False
ros_msg.point_step = 32
ros_msg.row_step = ros_msg.point_step * ros_msg.width * ros_msg.height
ros_msg.is_dense = False
buffer = []
for data in pcl_array:
s = struct.pack('>f', data[3])
i = struct.unpack('>l', s)[0]
pack = ctypes.c_uint32(i).value
r = (pack & 0x00FF0000) >> 16
g = (pack & 0x0000FF00) >> 8
b = (pack & 0x000000FF)
buffer.append(struct.pack('ffffBBBBIII', data[0], data[1], data[2], 1.0, b, g, r, 0, 0, 0, 0))
ros_msg.data = "".join(buffer)
return ros_msg
def XYZRGB_to_XYZ(XYZRGB_cloud):
""" Converts a PCL XYZRGB point cloud to an XYZ point cloud (removes color info)
Args:
XYZRGB_cloud (PointCloud_PointXYZRGB): A PCL XYZRGB point cloud
Returns:
PointCloud_PointXYZ: A PCL XYZ point cloud
"""
XYZ_cloud = pcl.PointCloud()
points_list = []
for data in XYZRGB_cloud:
points_list.append([data[0], data[1], data[2]])
XYZ_cloud.from_list(points_list)
return XYZ_cloud
def XYZ_to_XYZRGB(XYZ_cloud, color):
""" Converts a PCL XYZ point cloud to a PCL XYZRGB point cloud
All returned points in the XYZRGB cloud will be the color indicated
by the color parameter.
Args:
XYZ_cloud (PointCloud_XYZ): A PCL XYZ point cloud
color (list): 3-element list of integers [0-255,0-255,0-255]
Returns:
PointCloud_PointXYZRGB: A PCL XYZRGB point cloud
"""
XYZRGB_cloud = pcl.PointCloud_PointXYZRGB()
points_list = []
hex_r = (0xff & color[0]) << 16
hex_g = (0xff & color[1]) << 8
hex_b = (0xff & color[2])
hex_rgb = hex_r | hex_g | hex_b
float_rgb = struct.unpack('f', struct.pack('i', hex_rgb))[0]
for data in XYZ_cloud:
points_list.append([data[0], data[1], data[2], float_rgb])
XYZRGB_cloud.from_list(points_list)
return XYZRGB_cloud
def rgb_to_float(color):
""" Converts an RGB list to the packed float format used by PCL
From the PCL docs:
"Due to historical reasons (PCL was first developed as a ROS package),
the RGB information is packed into an integer and casted to a float"
Args:
color (list): 3-element list of integers [0-255,0-255,0-255]
Returns:
float: RGB value packed as a float
"""
hex_r = (0xff & color[0]) << 16
hex_g = (0xff & color[1]) << 8
hex_b = (0xff & color[2])
hex_rgb = hex_r | hex_g | hex_b
float_rgb = struct.unpack('f', struct.pack('i', hex_rgb))[0]
return float_rgb
COLOR_LIST = []
def get_color_list(cluster_count):
""" Returns a list of randomized colors
Args:
cluster_count (int): Number of random colors to generate
Returns:
(list): List containing 3-element color lists
"""
global COLOR_LIST
if cluster_count > len(COLOR_LIST):
for i in xrange(len(COLOR_LIST), cluster_count):
COLOR_LIST.append(random_color_gen())
return COLOR_LIST
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment