Created
May 19, 2021 08:22
-
-
Save horverno/b8d72f996b84c0925f019f0c770a2eae to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import os | |
import struct | |
import numpy as np | |
import rospy | |
from std_msgs.msg import Header | |
from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2, PointField | |
from image_geometry import PinholeCameraModel | |
import message_filters | |
from timeit import default_timer as timer | |
import tf # geometry2 transforms | |
from cv_bridge import CvBridge, CvBridgeError | |
import math | |
import cv2 | |
class CameraLidarFusionLanesROSNode(): | |
""" | |
Advertises a topic on ground pixels with depth information (x, y, z). | |
Args: | |
point_freq (int): Used lidar point frequency (1 is most dense). | |
Recommended values are 3 and 5. | |
""" | |
def __init__(self, point_freq=1): | |
self.CvBridge = CvBridge() | |
self.Camera_subscriber = message_filters.Subscriber( | |
#"/zed_node/left/image_rect_color/compressed", CompressedImage ###### ez ###### | |
"/zed_node/right_raw/image_raw_color", Image ###### ez ###### | |
) | |
self.Lidar_subscriber = message_filters.Subscriber( | |
"/right_os1/os1_cloud_node/points", PointCloud2 | |
) | |
self.Synchronizer = message_filters.ApproximateTimeSynchronizer( | |
fs=[self.Camera_subscriber, self.Lidar_subscriber], | |
queue_size=1, # set of messages stored per 'message_filter' | |
slop=0.2, # range of delay for synchronization | |
allow_headerless=True | |
) | |
self.Synchronizer.registerCallback(self.fusion_callback) | |
self.CameraModel = PinholeCameraModel() | |
# translation | |
self.Trans = [0.6, -0.10, 0.0] | |
# rotation | |
self.Rot = [1.57, -1.57, 0.0] ###### ez ###### | |
#self.Rot = [1.57, 1.57, 0.0] ###### ez ###### | |
self.RotationMatrix = None | |
self.point_freq = point_freq | |
self.lidar_data = np.ones((4, 32768))[:, ::self.point_freq] | |
self.Points = None | |
self.PointsToCloud = None | |
# call camera calibration | |
self.calibrate_camera() | |
def calculate_rotation_matrix(self): | |
""" | |
Calculates rotation matrix from translation and rotation. | |
""" | |
self.Trans = tuple(self.Trans) + (1, ) | |
self.RotationMatrix = np.asarray( | |
tf.transformations.euler_matrix( | |
self.Rot[0], self.Rot[1], self.Rot[2] | |
) | |
) | |
self.RotationMatrix[:, 3] = np.asarray(self.Trans) | |
def calibrate_camera(self): | |
""" | |
Calibrate camera and call rotation matrix calculation. | |
""" | |
camera_info = rospy.wait_for_message("/zed_node/left/camera_info", CameraInfo) | |
self.CameraModel.fromCameraInfo(camera_info) | |
print('[INFO] Calibration completed!') | |
self.calculate_rotation_matrix() | |
print('[INFO] Rotation Matrix calculated!') | |
def fusion_callback(self, camera_ros_msg, lidar_ros_msg): | |
""" | |
Main callback used in ApproximateTimeSynchronizer() to | |
fuse camera ground points (x, y) with depth information (z). | |
Args: | |
camera_ros_msg: Raw camera message from a message_filter. | |
pred_ros_msg: Segmentation message from a message_filter. | |
lidar_ros_msg: Raw lidar message from a message_filter. | |
""" | |
start_time = timer() | |
try: | |
np_arr = np.fromstring(camera_ros_msg.data, np.uint8) ###### ez ###### | |
image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) ###### ez ###### | |
#image = self.CvBridge.imgmsg_to_cv2(camera_ros_msg, "bgr8") ###### ez ###### | |
except CvBridgeError as e: | |
print(e) | |
#self.Points = [] | |
self.Points = [struct.unpack("<fff", lidar_ros_msg.data[idx:idx + 12]) for idx in range(0, len(lidar_ros_msg.data), 48 * self.point_freq)] | |
#for idx in range(0, len(lidar_ros_msg.data), 48 * self.point_freq): | |
# self.Points.append(struct.unpack("<fff", lidar_ros_msg.data[idx:idx + 12])) | |
self.Points = np.swapaxes(np.array(self.Points), 0, 1) | |
size = len(self.Points[0]) | |
self.lidar_data[:-1, :] = self.Points | |
newlidardata = [] | |
# dot exception occours sometimes that needs to be caught | |
# TODO: nem az osszeset transzformalni, kiszurni ami biztos nem esik a szegmentacio teruletere | |
# LINE PROFILER | |
try: | |
rotatedPointList = self.RotationMatrix.dot(self.lidar_data) | |
except AttributeError as e: | |
print(e) | |
return | |
x = rotatedPointList[0, :] | |
y = rotatedPointList[1, :] | |
z = rotatedPointList[2, :] | |
u = (self.CameraModel.fx() * x) / z + self.CameraModel.cx() | |
v = (self.CameraModel.fy() * y) / z + self.CameraModel.cy() | |
self.PointsToCloud = [] | |
#self.PointsToCloud = [[self.lidar_data[0][i], self.lidar_data[1][i], self.lidar_data[2][i], image[int(v[i]), int(u[i]), 2] / 255.0, image[int(v[i]), int(u[i]), 1] / 255.0, image[int(v[i]), int(u[i]), 0] / 255.0, 0] for i in range(0, size) if not(math.isinf(v[i])) and not(math.isinf(u[i])) and self.lidar_data[0][i] <= 0.0 and int(v[i]) >= 0 and int(v[i]) < image.shape[0] and int(u[i]) >= 0 and int(u[i]) < image.shape[1]] | |
for i in range(0, size, 1): | |
try: | |
if ( | |
###### ez ###### | |
#self.lidar_data[0][i] >= 0.0 and \ | |
###### ez ###### | |
self.lidar_data[0][i] <= 0.0 and \ | |
int(v[i]) >= 0 and int(v[i]) < image.shape[0] and \ | |
int(u[i]) >= 0 and int(u[i]) < image.shape[1] | |
): | |
(b, g, r) = image[int(v[i]), int(u[i]), :] | |
self.PointsToCloud.append( | |
[self.lidar_data[0][i], self.lidar_data[1][i], self.lidar_data[2][i], | |
r / 255.0, g / 255.0, b / 255.0, 0 | |
] | |
) | |
except: | |
pass | |
#dim 0 components --> main elements, dim 1 components --> (x, y, z, r, g, b, a) | |
self.PointsToCloud = np.array(self.PointsToCloud) | |
# ROS message attributes for publishing | |
ros_dtype = PointField.FLOAT32 | |
dtype = np.float32 | |
itemsize = np.dtype(dtype).itemsize | |
pcdata = self.PointsToCloud.astype(dtype).tobytes() | |
# create fields with list comprehension | |
fields = [ | |
PointField(name=n, | |
offset=i * itemsize, | |
datatype=ros_dtype, | |
count=1 | |
) for i, n in enumerate('xyzrgba') | |
] | |
header = Header(frame_id="right_os1/os1_lidar", stamp=rospy.Time.now()) | |
# Publish as ROS topic | |
lane_xyz_data = PointCloud2( | |
header=header, | |
height=1, | |
width=self.PointsToCloud.shape[0], | |
is_dense=False, | |
is_bigendian=False, | |
fields=fields, | |
point_step=(itemsize * 7), | |
row_step=(itemsize * 7 * self.PointsToCloud.shape[0]), | |
data=pcdata | |
) | |
publisher = rospy.Publisher('camera_stream/lidar_fusion/lanes', PointCloud2, queue_size=1) | |
publisher.publish(lane_xyz_data) | |
end_time = timer() | |
delta = end_time - start_time | |
print("[INFO] Callback Time: {} s".format(delta)) | |
print("[INFO] FPS: {} Hz".format(1 / delta)) | |
def main(self): | |
rospy.spin() | |
if __name__ == '__main__': | |
rospy.init_node('fusion_all_py', anonymous=True) | |
sensor_fusion_ros = CameraLidarFusionLanesROSNode() | |
sensor_fusion_ros.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment