Created
August 12, 2015 16:12
-
-
Save awesomebytes/d51fbd77ab1b887e7c3e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import rospy | |
| from sensor_msgs.msg import Image, CompressedImage | |
| import fcntl, sys, os | |
| from v4l2 import * | |
| import time | |
| import scipy.misc as misc | |
| import numpy as np | |
| import cv2 | |
| from cv_bridge import CvBridge, CvBridgeError | |
| def timing(f): | |
| def wrap(*args): | |
| time1 = time.time() | |
| ret = f(*args) | |
| time2 = time.time() | |
| print ' function took %0.3f ms' % ( (time2-time1)*1000.0) | |
| return ret | |
| return wrap | |
| @timing | |
| def ConvertToYUYV2(sizeimage, bytesperline, im): | |
| padding = 4096 | |
| buff = np.zeros((sizeimage+padding, ), dtype=np.uint8) | |
| imgrey = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY) | |
| #imgrey = im[:,:,0] * 0.299 + im[:,:,1] * 0.587 + im[:,:,2] * 0.114 | |
| time1 = time.time() | |
| Pb = im[:,:,0] * -0.168736 + im[:,:,1] * -0.331264 + im[:,:,2] * 0.5 | |
| time2 = time.time() | |
| print 'Pb generation took %0.3f ms' % ( (time2-time1)*1000.0) | |
| time1 = time.time() | |
| Pr = im[:,:,0] * 0.5 + im[:,:,1] * -0.418688 + im[:,:,2] * -0.081312 | |
| time2 = time.time() | |
| print 'Pr generation took %0.3f ms' % ( (time2-time1)*1000.0) | |
| time1 = time.time() | |
| for y in range(imgrey.shape[0]): | |
| #Set lumenance | |
| cursor = y * bytesperline + padding | |
| for x in range(imgrey.shape[1]): | |
| try: | |
| buff[cursor] = imgrey[y, x] | |
| except IndexError: | |
| pass | |
| cursor += 2 | |
| #Set color information for Cb | |
| cursor = y * bytesperline + padding | |
| for x in range(0, imgrey.shape[1], 2): | |
| try: | |
| buff[cursor+1] = 0.5 * (Pb[y, x] + Pb[y, x+1]) + 128 | |
| except IndexError: | |
| pass | |
| cursor += 4 | |
| #Set color information for Cr | |
| cursor = y * bytesperline + padding | |
| for x in range(0, imgrey.shape[1], 2): | |
| try: | |
| buff[cursor+3] = 0.5 * (Pr[y, x] + Pr[y, x+1]) + 128 | |
| except IndexError: | |
| pass | |
| cursor += 4 | |
| time2 = time.time() | |
| print 'loops took %0.3f ms' % ( (time2-time1)*1000.0) | |
| return buff.tostring() | |
| @timing | |
| def ConvertToYUYV2_fast(sizeimage, bytesperline, im): | |
| #buff = ConvertToYUYV(im, width=im.shape[1], height=im.shape[0]) | |
| #return buff | |
| padding = 4096 | |
| buff = np.zeros((sizeimage+padding, ), dtype=np.uint8) | |
| #buff.setflags(write=True) | |
| imgrey = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY) | |
| #imgrey = im[:,:,0] * 0.299 + im[:,:,1] * 0.587 + im[:,:,2] * 0.114 | |
| time1 = time.time() | |
| Pb = im[:,:,0] * -0.168736 + im[:,:,1] * -0.331264 + im[:,:,2] * 0.5 | |
| time2 = time.time() | |
| print 'Pb generation took %0.3f ms' % ( (time2-time1)*1000.0) | |
| time1 = time.time() | |
| Pr = im[:,:,0] * 0.5 + im[:,:,1] * -0.418688 + im[:,:,2] * -0.081312 | |
| time2 = time.time() | |
| print 'Pr generation took %0.3f ms' % ( (time2-time1)*1000.0) | |
| time11 = time.time() | |
| rows = imgrey.shape[0] | |
| cols = imgrey.shape[1] | |
| print "rows: " + str(rows) | |
| print "cols: " + str(cols) | |
| for y in range(rows): | |
| #Set luminance | |
| cursor = y * bytesperline + padding | |
| # time1 = time.time() | |
| for x in range(cols): | |
| buff[cursor] = imgrey[y, x] | |
| cursor += 2 | |
| # time2 = time.time() | |
| # print 'first loop took took %0.3f ms' % ( (time2-time1)*1000.0) | |
| cursor = y * bytesperline + padding | |
| # time1 = time.time() | |
| for x in range(0, cols, 2): | |
| #Set color information for Cb | |
| buff[cursor+1] = 0.5 * (Pb[y, x] + Pb[y, x+1]) + 128 | |
| cursor += 4 | |
| cursor = y * bytesperline + padding | |
| for x in range(0, cols, 2): | |
| #Set color information for Cr | |
| buff[cursor+3] = 0.5 * (Pr[y, x] + Pr[y, x+1]) + 128 | |
| cursor += 4 | |
| # time2 = time.time() | |
| # print 'second loop took took %0.3f ms' % ( (time2-time1)*1000.0) | |
| time22 = time.time() | |
| print 'loops took %0.3f ms' % ( (time22-time11)*1000.0) | |
| return buff #.tostring() | |
| def ConvertToYUYV(image, width=None, height=None): | |
| imsize = image.shape[0] * image.shape[1] * 2 | |
| buff = np.zeros((imsize), dtype=np.uint8) | |
| img = cv2.cvtColor(image, cv2.COLOR_BGR2YUV).ravel() | |
| Ys = np.arange(0, img.shape[0], 3) | |
| Vs = np.arange(1, img.shape[0], 6) | |
| Us = np.arange(2, img.shape[0], 6) | |
| BYs = np.arange(0, buff.shape[0], 2) | |
| BUs = np.arange(1, buff.shape[0], 4) | |
| BVs = np.arange(3, buff.shape[0], 4) | |
| buff[BYs] = img[Ys] | |
| buff[BUs] = img[Us] | |
| buff[BVs] = img[Vs] | |
| return buff | |
| class VideoDeviceFromTopic(object): | |
| def __init__(self): | |
| rospy.loginfo("Initializing VideoDeviceFromTopic") | |
| self.device_name = rospy.get_param('video_device', '/dev/video26') | |
| rospy.loginfo("Using device: " + str(self.device_name)) | |
| self.fps = rospy.get_param('fps', 30) | |
| rospy.loginfo("Will write images at fps: " + str(self.fps)) | |
| if not os.path.exists(self.device_name ): | |
| rospy.logerr("Warning: device does not exist: " + str(self.device_name)) | |
| exit(0) | |
| self.bridge = CvBridge() | |
| self.device = open(self.device_name, 'w') | |
| self.last_img = None | |
| self.initialized = False | |
| self.format = None | |
| self.sub = rospy.Subscriber('image', Image, self.image_cb, queue_size=1) | |
| #self.subcompressed = rospy.Subscriber('/fisheye/image_raw/compressed', CompressedImage, self.image_cb, queue_size=1) | |
| #rospy.loginfo("Subscribed to Image topic: " + str(self.sub.resolved_name)) | |
| def image_cb(self, data): | |
| """ | |
| :type data: Image | |
| :return: | |
| """ | |
| self.last_img = data | |
| def run(self): | |
| rospy.loginfo("Waiting for first image...") | |
| while self.last_img is None: | |
| rospy.sleep(0.1) | |
| r = rospy.Rate(self.fps) | |
| while not rospy.is_shutdown(): | |
| #self.insert_frame() | |
| #self.insert_frame_slow() | |
| self.insert_frame_slow_uncompressed() | |
| #self.insert_frame_slow_bgr() | |
| #self.insert_lenna() | |
| r.sleep() | |
| def insert_frame(self): | |
| # np_arr = np.fromstring(self.last_img.data, np.uint8) | |
| # image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR) | |
| if not self.initialized: | |
| # if type(self.last_img) == type(Image()): | |
| width = self.last_img.width | |
| height = self.last_img.height | |
| # else: | |
| # width = image_np.shape[0] | |
| # height = image_np.shape[1] | |
| # width = 512 | |
| # height = 512 | |
| print "image size: " | |
| print width | |
| print height | |
| capability = v4l2_capability() | |
| print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability)) | |
| print "capabilities", hex(capability.capabilities) | |
| #fmt = V4L2_PIX_FMT_YUYV # this worked almost | |
| fmt = V4L2_PIX_FMT_BGR24 | |
| #fmt = V4L2_PIX_FMT_RGB24 | |
| #fmt = V4L2_PIX_FMT_YVU420 | |
| print("v4l2 driver: " + capability.driver) | |
| self.format = v4l2_format() | |
| self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT | |
| self.format.fmt.pix.pixelformat = fmt | |
| self.format.fmt.pix.width = width | |
| self.format.fmt.pix.height = height | |
| self.format.fmt.pix.field = V4L2_FIELD_NONE | |
| self.format.fmt.pix.bytesperline = width * 2 # was 2 | |
| self.format.fmt.pix.sizeimage = width * height * 2 # was 2 | |
| self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG | |
| #self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB | |
| print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format)) | |
| self.initialized = True | |
| #Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline | |
| #may have changed at this point | |
| #Create image buffer | |
| #im = misc.imread("Lenna.png") # image as array | |
| #print "im is: " | |
| #print type(im) | |
| #print im | |
| cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough") | |
| #cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "rgb8") | |
| # its bgr8 because the ros msg says so | |
| #cv2.cvtColor(cv_image, cv_image, cv2.COLOR_BGR2YCR_CB) | |
| #cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV) | |
| #cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YCR_CB) # almooooooooost | |
| #cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV) # very close too | |
| #cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV_I420) | |
| #cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV_IYUV) | |
| #cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV_YV12) | |
| #buff = cv_image #.tostring() | |
| # if not self.initialized: | |
| # buff = np.zeros((4096,), dtype=np.uint8) | |
| # np.append(buff, cv_image) | |
| # else: | |
| # buff = cv_image | |
| buff = cv_image | |
| #padding = 4096 | |
| #buff = np.zeros((padding, ), dtype=np.uint8) | |
| #np.append(buff, cv_image) | |
| #buff = buff.tostring() | |
| # print "cv image is" | |
| # print type(cv_image) | |
| # print cv_image | |
| #buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, cv_image) | |
| #buff = np.array(cv2.imencode('.jpg', cv_image)[1]).tostring() | |
| #buff = ConvertToYUYV(cv_image) | |
| #buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, np_arr) | |
| #buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, image_np) | |
| #buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, im) | |
| #buff = ConvertToYUYV(im) | |
| self.device.write(buff) | |
| #self.device.write(buff) | |
| def insert_frame_slow_uncompressed(self): | |
| image_np = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough") | |
| image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB) | |
| if not self.initialized: | |
| # if type(self.last_img) == type(Image()): | |
| #width = image_np.shape[1] | |
| #height = image_np.shape[0] | |
| width = self.last_img.width | |
| height = self.last_img.height | |
| print "image size: " | |
| print width | |
| print height | |
| capability = v4l2_capability() | |
| print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability)) | |
| print "capabilities", hex(capability.capabilities) | |
| fmt = V4L2_PIX_FMT_YUYV # this worked almost | |
| #fmt = V4L2_PIX_FMT_BGR24 | |
| #fmt = V4L2_PIX_FMT_RGB24 | |
| #fmt = V4L2_PIX_FMT_YVU420 | |
| print("v4l2 driver: " + capability.driver) | |
| self.format = v4l2_format() | |
| self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT | |
| self.format.fmt.pix.pixelformat = fmt | |
| self.format.fmt.pix.width = width | |
| self.format.fmt.pix.height = height | |
| self.format.fmt.pix.field = V4L2_FIELD_NONE | |
| self.format.fmt.pix.bytesperline = width * 2 # was 2 | |
| self.format.fmt.pix.sizeimage = width * height * 2 # was 2 | |
| self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG | |
| #self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB | |
| print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format)) | |
| self.initialized = True | |
| #Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline | |
| #may have changed at this point | |
| #print "convert img passthrough" | |
| #cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough") | |
| #cv_image.tostring() | |
| print "convert to yuyv2" | |
| print "format sizeiamge: " + str(self.format.fmt.pix.sizeimage) | |
| print "format bytesperline: " + str(self.format.fmt.pix.bytesperline) | |
| #buff = ConvertToYUYV2(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np) | |
| buff = ConvertToYUYV2_fast(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np) | |
| print "write to device" | |
| self.device.write(buff) | |
| def insert_frame_slow(self): | |
| np_arr = np.fromstring(self.last_img.data, np.uint8) | |
| image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR) | |
| image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB) | |
| if not self.initialized: | |
| # if type(self.last_img) == type(Image()): | |
| width = image_np.shape[1] | |
| height = image_np.shape[0] | |
| #width = self.last_img.width | |
| #height = self.last_img.height | |
| print "image size: " | |
| print width | |
| print height | |
| capability = v4l2_capability() | |
| print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability)) | |
| print "capabilities", hex(capability.capabilities) | |
| fmt = V4L2_PIX_FMT_YUYV # this worked almost | |
| #fmt = V4L2_PIX_FMT_BGR24 | |
| #fmt = V4L2_PIX_FMT_RGB24 | |
| #fmt = V4L2_PIX_FMT_YVU420 | |
| print("v4l2 driver: " + capability.driver) | |
| self.format = v4l2_format() | |
| self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT | |
| self.format.fmt.pix.pixelformat = fmt | |
| self.format.fmt.pix.width = width | |
| self.format.fmt.pix.height = height | |
| self.format.fmt.pix.field = V4L2_FIELD_NONE | |
| self.format.fmt.pix.bytesperline = width * 2 # was 2 | |
| self.format.fmt.pix.sizeimage = width * height * 2 # was 2 | |
| self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG | |
| #self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB | |
| print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format)) | |
| self.initialized = True | |
| #Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline | |
| #may have changed at this point | |
| #print "convert img passthrough" | |
| #cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough") | |
| #cv_image.tostring() | |
| print "convert to yuyv2" | |
| print "format sizeiamge: " + str(self.format.fmt.pix.sizeimage) | |
| print "format bytesperline: " + str(self.format.fmt.pix.bytesperline) | |
| #buff = ConvertToYUYV2(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np) | |
| buff = ConvertToYUYV2_fast(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np) | |
| print "write to device" | |
| self.device.write(buff) | |
| def insert_frame_slow_bgr(self): | |
| np_arr = np.fromstring(self.last_img.data, np.uint8) | |
| image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR) | |
| image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2YUV_I420) | |
| #image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2) | |
| if not self.initialized: | |
| # if type(self.last_img) == type(Image()): | |
| width = image_np.shape[1] | |
| height = image_np.shape[0] | |
| #width = self.last_img.width | |
| #height = self.last_img.height | |
| print "image size: " | |
| print width | |
| print height | |
| capability = v4l2_capability() | |
| print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability)) | |
| print "capabilities", hex(capability.capabilities) | |
| #fmt = V4L2_PIX_FMT_YUYV # this worked almost | |
| #fmt = V4L2_PIX_FMT_BGR24 | |
| fmt = V4L2_PIX_FMT_YUV420 | |
| #fmt = V4L2_PIX_FMT_RGB24 | |
| #fmt = V4L2_PIX_FMT_YVU420 | |
| print("v4l2 driver: " + capability.driver) | |
| self.format = v4l2_format() | |
| self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT | |
| self.format.fmt.pix.pixelformat = fmt | |
| self.format.fmt.pix.width = width | |
| self.format.fmt.pix.height = height | |
| self.format.fmt.pix.field = V4L2_FIELD_NONE | |
| self.format.fmt.pix.bytesperline = width * 2 # was 2 | |
| self.format.fmt.pix.sizeimage = width * height * 2 # was 2 | |
| self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG | |
| #self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB | |
| print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format)) | |
| self.initialized = True | |
| #Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline | |
| #may have changed at this point | |
| #print "convert img passthrough" | |
| #cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough") | |
| #cv_image.tostring() | |
| print "convert to yuyv2" | |
| print "format sizeiamge: " + str(self.format.fmt.pix.sizeimage) | |
| print "format bytesperline: " + str(self.format.fmt.pix.bytesperline) | |
| #buff = ConvertToYUYV2(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np) | |
| buff = image_np | |
| print "write to device" | |
| self.device.write(buff) | |
| def insert_lenna(self): | |
| width = 512 | |
| height = 512 | |
| capability = v4l2_capability() | |
| print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability)) | |
| print "capabilities", hex(capability.capabilities) | |
| fmt = V4L2_PIX_FMT_YUYV | |
| #fmt = V4L2_PIX_FMT_YVU420 | |
| print("v4l2 driver: " + capability.driver) | |
| format = v4l2_format() | |
| format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT | |
| format.fmt.pix.pixelformat = fmt | |
| format.fmt.pix.width = width | |
| format.fmt.pix.height = height | |
| format.fmt.pix.field = V4L2_FIELD_NONE | |
| format.fmt.pix.bytesperline = width * 2 | |
| format.fmt.pix.sizeimage = width * height * 2 | |
| format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG | |
| print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, format)) | |
| #Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline | |
| #may have changed at this point | |
| #Create image buffer | |
| im = misc.imread("Lenna.png") # image as array | |
| buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, im) | |
| while True: | |
| self.device.write(buff) | |
| time.sleep(1./30.) | |
| if __name__=="__main__": | |
| rospy.init_node('imagetopictovideodevice') | |
| vdft = VideoDeviceFromTopic() | |
| vdft.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment