-
-
Save awesomebytes/958a5ef9e63821a28dc05775840c34d9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
import numpy as np | |
import cv2 | |
from cv_bridge import CvBridge, CvBridgeError | |
""" | |
ROS ImageTools, a class that contains methods to transform | |
from & to ROS Image, ROS CompressedImage & numpy.ndarray (cv2 image). | |
Also deals with Depth images, with a tricky catch, as they are compressed in | |
PNG, and we are here hardcoding to compression level 3 and the default | |
quantizations of the plugin. (What we use in our robots). | |
Meanwhile image_transport has no Python interface, this is the best we can do. | |
Author: Sammy Pfeiffer <Sammy.Pfeiffer at student.uts.edu.au> | |
""" | |
class ImageTools(object): | |
def __init__(self): | |
self._cv_bridge = CvBridge() | |
def convert_ros_msg_to_cv2(self, ros_data, image_encoding='bgr8'): | |
""" | |
Convert from a ROS Image message to a cv2 image. | |
""" | |
try: | |
return self._cv_bridge.imgmsg_to_cv2(ros_data, image_encoding) | |
except CvBridgeError as e: | |
if "[16UC1] is not a color format" in str(e): | |
raise CvBridgeError( | |
"You may be trying to use a Image method " + | |
"(Subscriber, Publisher, conversion) on a depth image" + | |
" message. Original exception: " + str(e)) | |
raise e | |
def convert_ros_compressed_to_cv2(self, compressed_msg): | |
np_arr = np.fromstring(compressed_msg.data, np.uint8) | |
return cv2.imdecode(np_arr, cv2.IMREAD_COLOR) | |
def convert_ros_compressed_msg_to_ros_msg(self, compressed_msg, | |
encoding='bgr8'): | |
cv2_img = self.convert_ros_compressed_to_cv2(compressed_msg) | |
ros_img = self._cv_bridge.cv2_to_imgmsg(cv2_img, encoding=encoding) | |
ros_img.header = compressed_msg.header | |
return ros_img | |
def convert_cv2_to_ros_msg(self, cv2_data, image_encoding='bgr8'): | |
""" | |
Convert from a cv2 image to a ROS Image message. | |
""" | |
return self._cv_bridge.cv2_to_imgmsg(cv2_data, image_encoding) | |
def convert_cv2_to_ros_compressed_msg(self, cv2_data, | |
compressed_format='jpg'): | |
""" | |
Convert from cv2 image to ROS CompressedImage. | |
""" | |
return self._cv_bridge.cv2_to_compressed_imgmsg(cv2_data, | |
dst_format=compressed_format) | |
def convert_ros_msg_to_ros_compressed_msg(self, image, | |
image_encoding='bgr8', | |
compressed_format="jpg"): | |
""" | |
Convert from ROS Image message to ROS CompressedImage. | |
""" | |
cv2_img = self.convert_ros_msg_to_cv2(image, image_encoding) | |
cimg_msg = self._cv_bridge.cv2_to_compressed_imgmsg(cv2_img, | |
dst_format=compressed_format) | |
cimg_msg.header = image.header | |
return cimg_msg | |
def convert_to_cv2(self, image): | |
""" | |
Convert any kind of image to cv2. | |
""" | |
cv2_img = None | |
if type(image) == np.ndarray: | |
cv2_img = image | |
elif image._type == 'sensor_msgs/Image': | |
cv2_img = self.convert_ros_msg_to_cv2(image) | |
elif image._type == 'sensor_msgs/CompressedImage': | |
cv2_img = self.convert_ros_compressed_to_cv2(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return cv2_img | |
def convert_to_ros_msg(self, image): | |
""" | |
Convert any kind of image to ROS Image. | |
""" | |
ros_msg = None | |
if type(image) == np.ndarray: | |
ros_msg = self.convert_cv2_to_ros_msg(image) | |
elif image._type == 'sensor_msgs/Image': | |
ros_msg = image | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_msg = self.convert_ros_compressed_msg_to_ros_msg(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_msg | |
def convert_to_ros_compressed_msg(self, image, compressed_format='jpg'): | |
""" | |
Convert any kind of image to ROS Compressed Image. | |
""" | |
ros_cmp = None | |
if type(image) == np.ndarray: | |
ros_cmp = self.convert_cv2_to_ros_compressed_msg( | |
image, compressed_format=compressed_format) | |
elif image._type == 'sensor_msgs/Image': | |
ros_cmp = self.convert_ros_msg_to_ros_compressed_msg( | |
image, compressed_format=compressed_format) | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_cmp = image | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_cmp | |
def convert_depth_to_ros_msg(self, image): | |
ros_msg = None | |
if type(image) == np.ndarray: | |
ros_msg = self.convert_cv2_to_ros_msg(image, | |
image_encoding='mono16') | |
elif image._type == 'sensor_msgs/Image': | |
image.encoding = '16UC1' | |
ros_msg = image | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_msg = self.convert_compressedDepth_to_image_msg(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_msg | |
def convert_depth_to_ros_compressed_msg(self, image): | |
ros_cmp = None | |
if type(image) == np.ndarray: | |
ros_cmp = self.convert_cv2_to_ros_compressed_msg(image, | |
compressed_format='png') | |
ros_cmp.format = '16UC1; compressedDepth' | |
# This is a header ROS depth CompressedImage have, necessary | |
# for viewer tools to see the image | |
# extracted from a real image from a robot | |
# The code that does it in C++ is this: | |
# https://github.com/ros-perception/image_transport_plugins/blob/indigo-devel/compressed_depth_image_transport/src/codec.cpp | |
ros_cmp.data = "\x00\x00\x00\x00\x88\x9c\x5c\xaa\x00\x40\x4b\xb7" + ros_cmp.data | |
elif image._type == 'sensor_msgs/Image': | |
image.encoding = "mono16" | |
ros_cmp = self.convert_ros_msg_to_ros_compressed_msg( | |
image, | |
image_encoding='mono16', | |
compressed_format='png') | |
ros_cmp.format = '16UC1; compressedDepth' | |
ros_cmp.data = "\x00\x00\x00\x00\x88\x9c\x5c\xaa\x00\x40\x4b\xb7" + ros_cmp.data | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_cmp = image | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_cmp | |
def convert_depth_to_cv2(self, image): | |
cv2_img = None | |
if type(image) == np.ndarray: | |
cv2_img = image | |
elif image._type == 'sensor_msgs/Image': | |
image.encoding = 'mono16' | |
cv2_img = self.convert_ros_msg_to_cv2(image, | |
image_encoding='mono16') | |
elif image._type == 'sensor_msgs/CompressedImage': | |
cv2_img = self.convert_compressedDepth_to_cv2(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return cv2_img | |
def convert_compressedDepth_to_image_msg(self, compressed_image): | |
""" | |
Convert a compressedDepth topic image into a ROS Image message. | |
compressed_image must be from a topic /bla/compressedDepth | |
as it's encoded in PNG | |
Code from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/ | |
""" | |
depth_img_raw = self.convert_compressedDepth_to_cv2(compressed_image) | |
img_msg = self._cv_bridge.cv2_to_imgmsg(depth_img_raw, "mono16") | |
img_msg.header = compressed_image.header | |
img_msg.encoding = "16UC1" | |
return img_msg | |
def convert_compressedDepth_to_cv2(self, compressed_depth): | |
""" | |
Convert a compressedDepth topic image into a cv2 image. | |
compressed_depth must be from a topic /bla/compressedDepth | |
as it's encoded in PNG | |
Code from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/ | |
""" | |
depth_fmt, compr_type = compressed_depth.format.split(';') | |
# remove white space | |
depth_fmt = depth_fmt.strip() | |
compr_type = compr_type.strip() | |
if compr_type != "compressedDepth": | |
raise Exception("Compression type is not 'compressedDepth'." | |
"You probably subscribed to the wrong topic.") | |
# remove header from raw data, if necessary | |
if 'PNG' in compressed_depth.data[:12]: | |
# If we compressed it with opencv, there is nothing to strip | |
depth_header_size = 0 | |
else: | |
# If it comes from a robot/sensor, it has 12 useless bytes apparently | |
depth_header_size = 12 | |
raw_data = compressed_depth.data[depth_header_size:] | |
depth_img_raw = cv2.imdecode(np.frombuffer(raw_data, np.uint8), | |
# the cv2.CV_LOAD_IMAGE_UNCHANGED has been removed | |
-1) # cv2.CV_LOAD_IMAGE_UNCHANGED) | |
if depth_img_raw is None: | |
# probably wrong header size | |
raise Exception("Could not decode compressed depth image." | |
"You may need to change 'depth_header_size'!") | |
return depth_img_raw | |
def display_image(self, image): | |
""" | |
Use cv2 to show an image. | |
""" | |
cv2_img = self.convert_to_cv2(image) | |
window_name = 'show_image press q to exit' | |
cv2.imshow(window_name, cv2_img) | |
# TODO: Figure out how to check if the window | |
# was closed... when a user does it, the program is stuck | |
key = cv2.waitKey(0) | |
if chr(key) == 'q': | |
cv2.destroyWindow(window_name) | |
def save_image(self, image, filename): | |
""" | |
Given an image in numpy array or ROS format | |
save it using cv2 to the filename. The extension | |
declares the type of image (e.g. .jpg .png). | |
""" | |
cv2_img = self.convert_to_cv2(image) | |
cv2.imwrite(filename, cv2_img) | |
def save_depth_image(self, image, filename): | |
""" | |
Save a normalized (easier to visualize) version | |
of a depth image into a file. | |
""" | |
# Duc's smart undocummented code | |
im_array = self.convert_depth_to_cv2(image) | |
min_distance, max_distance = np.min(im_array), np.max(im_array) | |
im_array = im_array * 1.0 | |
im_array = (im_array < max_distance) * im_array | |
im_array = (im_array - min_distance) / max_distance * 255.0 | |
im_array = (im_array >= 0) * im_array | |
cv2.imwrite(filename, im_array) | |
def load_from_file(self, file_path, cv2_imread_mode=None): | |
""" | |
Load image from a file. | |
:param file_path str: Path to the image file. | |
:param cv2_imread_mode int: cv2.IMREAD_ mode, modes are: | |
cv2.IMREAD_ANYCOLOR 4 cv2.IMREAD_REDUCED_COLOR_4 33 | |
cv2.IMREAD_ANYDEPTH 2 cv2.IMREAD_REDUCED_COLOR_8 65 | |
cv2.IMREAD_COLOR 1 cv2.IMREAD_REDUCED_GRAYSCALE_2 16 | |
cv2.IMREAD_GRAYSCALE 0 cv2.IMREAD_REDUCED_GRAYSCALE_4 32 | |
cv2.IMREAD_IGNORE_ORIENTATION 128 cv2.IMREAD_REDUCED_GRAYSCALE_8 64 | |
cv2.IMREAD_LOAD_GDAL 8 cv2.IMREAD_UNCHANGED -1 | |
cv2.IMREAD_REDUCED_COLOR_2 17 | |
""" | |
if cv2_imread_mode is not None: | |
img = cv2.imread(file_path, cv2_imread_mode) | |
img = cv2.imread(file_path) | |
if img is None: | |
raise RuntimeError("No image found to load at " + str(file_path)) | |
return img | |
if __name__ == '__main__': | |
from sensor_msgs.msg import Image, CompressedImage | |
it = ImageTools() | |
from cPickle import load | |
img = load(open('rgb_image.pickle', 'r')) | |
cv2_img = it.convert_ros_msg_to_cv2(img) | |
print(type(cv2_img)) | |
ros_img = it.convert_cv2_to_ros_msg(cv2_img) | |
print(type(ros_img)) | |
ros_compressed2 = it.convert_cv2_to_ros_compressed_msg(cv2_img) | |
print(type(ros_compressed2)) | |
compressed_ros_img = it.convert_ros_msg_to_ros_compressed_msg(ros_img) | |
print(type(compressed_ros_img)) | |
ros_img2 = it.convert_ros_compressed_msg_to_ros_msg(compressed_ros_img) | |
print(type(ros_img2)) | |
cv2_2 = it.convert_ros_compressed_to_cv2(compressed_ros_img) | |
print(type(cv2_2)) | |
cv2_3 = it.convert_to_cv2(cv2_img) | |
cv2_4 = it.convert_to_cv2(ros_img) | |
cv2_5 = it.convert_to_cv2(compressed_ros_img) | |
ros_3 = it.convert_to_ros_msg(cv2_img) | |
ros_4 = it.convert_to_ros_msg(ros_img) | |
ros_5 = it.convert_to_ros_msg(compressed_ros_img) | |
rosc_3 = it.convert_to_ros_compressed_msg(cv2_img) | |
rosc_4 = it.convert_to_ros_compressed_msg(ros_img) | |
rosc_5 = it.convert_to_ros_compressed_msg(compressed_ros_img) | |
depthcompimg = load(open('depth_compressed_image.pickle', 'r')) | |
depth_cv2 = it.convert_depth_to_cv2(depthcompimg) | |
depth_ros = it.convert_depth_to_ros_msg(depthcompimg) | |
depth_ros_comp = it.convert_depth_to_ros_compressed_msg(depthcompimg) | |
it.save_image(depth_cv2, 'depth_comp_cv2.jpg') | |
it.save_depth_image(depth_cv2, 'depth_comp_normalized_cv2.jpg') | |
depthimg = load(open('depth_image.pickle', 'r')) | |
depthimg_cv2 = it.convert_depth_to_cv2(depthimg) | |
depthimg_ros = it.convert_depth_to_ros_msg(depthimg) | |
depthimg_ros_comp = it.convert_depth_to_ros_compressed_msg(depthimg) | |
it.save_image(depth_cv2, 'depth_cv2.jpg') | |
it.save_depth_image(depth_cv2, 'depth_normalized_cv2.jpg') | |
it.save_image(cv2_img, 'cv2_image.jpg') | |
it.save_image(ros_img, 'ros_image.jpg') | |
it.save_image(compressed_ros_img, 'compressed_ros_image.jpg') | |
it.display_image(cv2_img) |
That's an odd format! Thanks for figuring it out. I'm going to quote here your reply there just for the sake of conservation of information:
Here is what I came up with- I haven't yet checked to make sure the depth values when decompressed are correct, they just seemed reasonable as seen in rqt:
import cv2
import numpy as np
from sensor_msgs.msg import CompressedImage
def encode_compressed_depth_image_msg(depth_image: np.array,
depth_min=1.0, depth_max=10.0,
depth_quantization=100.0):
depth_image[depth_image > depth_max] = np.nan
depth_image[depth_image < depth_min] = np.nan
depth_z0 = depth_quantization
depth_quant_a = depth_z0 * (depth_z0 + 1.0)
depth_quant_b = 1.0 - depth_quant_a / depth_max
inv_depth = (depth_quant_a / depth_image + depth_quant_b).astype(np.uint16)
depth_encoded = cv2.imencode(".png", inv_depth)[1]
header = np.array([0.0, depth_quant_a, depth_quant_b], np.float32)
compressed_depth_msg = CompressedImage()
compressed_depth_msg.format = "32FC1; compressedDepth png"
compressed_depth_msg.data = header.tobytes() + depth_encoded.tobytes()
return compressed_depth_msg
The payload of the CompressedDepth 32FC1; compressedDepth png
is 12 header bytes which contains the conversion scaling numbers quant a & b, then an encoded 16-bit grayscale png follows (if you lop off those first 12 bytes and save the rest of the bytes to a file you can display it in a png viewer and see the depth values). The scaling numbers fixed above but could be changed per-frame if there was a reason to, by manipulating depth_max and depth_quantization (or refactor so depth_quant_a and b are set through some other means).
@awesomebytes thank you very much for publishing these conversion tools. For 2D RGB images, they work really well.
@Mechazo11 You are welcome! I'm very happy to know my snippets help people! :)
I think I figured it out here https://answers.ros.org/question/407659/how-to-generate-format-32fc1-compresseddepth-png-image-in-python-from-float-array/ - and it turned out it isn't actually 32fc1 despite the format name, just uint16 with a scale and offset (and the depth values get inverted)