Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Created August 12, 2015 16:12
Show Gist options
  • Select an option

  • Save awesomebytes/d51fbd77ab1b887e7c3e to your computer and use it in GitHub Desktop.

Select an option

Save awesomebytes/d51fbd77ab1b887e7c3e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import rospy
from sensor_msgs.msg import Image, CompressedImage
import fcntl, sys, os
from v4l2 import *
import time
import scipy.misc as misc
import numpy as np
import cv2
from cv_bridge import CvBridge, CvBridgeError
def timing(f):
def wrap(*args):
time1 = time.time()
ret = f(*args)
time2 = time.time()
print ' function took %0.3f ms' % ( (time2-time1)*1000.0)
return ret
return wrap
@timing
def ConvertToYUYV2(sizeimage, bytesperline, im):
padding = 4096
buff = np.zeros((sizeimage+padding, ), dtype=np.uint8)
imgrey = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
#imgrey = im[:,:,0] * 0.299 + im[:,:,1] * 0.587 + im[:,:,2] * 0.114
time1 = time.time()
Pb = im[:,:,0] * -0.168736 + im[:,:,1] * -0.331264 + im[:,:,2] * 0.5
time2 = time.time()
print 'Pb generation took %0.3f ms' % ( (time2-time1)*1000.0)
time1 = time.time()
Pr = im[:,:,0] * 0.5 + im[:,:,1] * -0.418688 + im[:,:,2] * -0.081312
time2 = time.time()
print 'Pr generation took %0.3f ms' % ( (time2-time1)*1000.0)
time1 = time.time()
for y in range(imgrey.shape[0]):
#Set lumenance
cursor = y * bytesperline + padding
for x in range(imgrey.shape[1]):
try:
buff[cursor] = imgrey[y, x]
except IndexError:
pass
cursor += 2
#Set color information for Cb
cursor = y * bytesperline + padding
for x in range(0, imgrey.shape[1], 2):
try:
buff[cursor+1] = 0.5 * (Pb[y, x] + Pb[y, x+1]) + 128
except IndexError:
pass
cursor += 4
#Set color information for Cr
cursor = y * bytesperline + padding
for x in range(0, imgrey.shape[1], 2):
try:
buff[cursor+3] = 0.5 * (Pr[y, x] + Pr[y, x+1]) + 128
except IndexError:
pass
cursor += 4
time2 = time.time()
print 'loops took %0.3f ms' % ( (time2-time1)*1000.0)
return buff.tostring()
@timing
def ConvertToYUYV2_fast(sizeimage, bytesperline, im):
#buff = ConvertToYUYV(im, width=im.shape[1], height=im.shape[0])
#return buff
padding = 4096
buff = np.zeros((sizeimage+padding, ), dtype=np.uint8)
#buff.setflags(write=True)
imgrey = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
#imgrey = im[:,:,0] * 0.299 + im[:,:,1] * 0.587 + im[:,:,2] * 0.114
time1 = time.time()
Pb = im[:,:,0] * -0.168736 + im[:,:,1] * -0.331264 + im[:,:,2] * 0.5
time2 = time.time()
print 'Pb generation took %0.3f ms' % ( (time2-time1)*1000.0)
time1 = time.time()
Pr = im[:,:,0] * 0.5 + im[:,:,1] * -0.418688 + im[:,:,2] * -0.081312
time2 = time.time()
print 'Pr generation took %0.3f ms' % ( (time2-time1)*1000.0)
time11 = time.time()
rows = imgrey.shape[0]
cols = imgrey.shape[1]
print "rows: " + str(rows)
print "cols: " + str(cols)
for y in range(rows):
#Set luminance
cursor = y * bytesperline + padding
# time1 = time.time()
for x in range(cols):
buff[cursor] = imgrey[y, x]
cursor += 2
# time2 = time.time()
# print 'first loop took took %0.3f ms' % ( (time2-time1)*1000.0)
cursor = y * bytesperline + padding
# time1 = time.time()
for x in range(0, cols, 2):
#Set color information for Cb
buff[cursor+1] = 0.5 * (Pb[y, x] + Pb[y, x+1]) + 128
cursor += 4
cursor = y * bytesperline + padding
for x in range(0, cols, 2):
#Set color information for Cr
buff[cursor+3] = 0.5 * (Pr[y, x] + Pr[y, x+1]) + 128
cursor += 4
# time2 = time.time()
# print 'second loop took took %0.3f ms' % ( (time2-time1)*1000.0)
time22 = time.time()
print 'loops took %0.3f ms' % ( (time22-time11)*1000.0)
return buff #.tostring()
def ConvertToYUYV(image, width=None, height=None):
imsize = image.shape[0] * image.shape[1] * 2
buff = np.zeros((imsize), dtype=np.uint8)
img = cv2.cvtColor(image, cv2.COLOR_BGR2YUV).ravel()
Ys = np.arange(0, img.shape[0], 3)
Vs = np.arange(1, img.shape[0], 6)
Us = np.arange(2, img.shape[0], 6)
BYs = np.arange(0, buff.shape[0], 2)
BUs = np.arange(1, buff.shape[0], 4)
BVs = np.arange(3, buff.shape[0], 4)
buff[BYs] = img[Ys]
buff[BUs] = img[Us]
buff[BVs] = img[Vs]
return buff
class VideoDeviceFromTopic(object):
def __init__(self):
rospy.loginfo("Initializing VideoDeviceFromTopic")
self.device_name = rospy.get_param('video_device', '/dev/video26')
rospy.loginfo("Using device: " + str(self.device_name))
self.fps = rospy.get_param('fps', 30)
rospy.loginfo("Will write images at fps: " + str(self.fps))
if not os.path.exists(self.device_name ):
rospy.logerr("Warning: device does not exist: " + str(self.device_name))
exit(0)
self.bridge = CvBridge()
self.device = open(self.device_name, 'w')
self.last_img = None
self.initialized = False
self.format = None
self.sub = rospy.Subscriber('image', Image, self.image_cb, queue_size=1)
#self.subcompressed = rospy.Subscriber('/fisheye/image_raw/compressed', CompressedImage, self.image_cb, queue_size=1)
#rospy.loginfo("Subscribed to Image topic: " + str(self.sub.resolved_name))
def image_cb(self, data):
"""
:type data: Image
:return:
"""
self.last_img = data
def run(self):
rospy.loginfo("Waiting for first image...")
while self.last_img is None:
rospy.sleep(0.1)
r = rospy.Rate(self.fps)
while not rospy.is_shutdown():
#self.insert_frame()
#self.insert_frame_slow()
self.insert_frame_slow_uncompressed()
#self.insert_frame_slow_bgr()
#self.insert_lenna()
r.sleep()
def insert_frame(self):
# np_arr = np.fromstring(self.last_img.data, np.uint8)
# image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
if not self.initialized:
# if type(self.last_img) == type(Image()):
width = self.last_img.width
height = self.last_img.height
# else:
# width = image_np.shape[0]
# height = image_np.shape[1]
# width = 512
# height = 512
print "image size: "
print width
print height
capability = v4l2_capability()
print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability))
print "capabilities", hex(capability.capabilities)
#fmt = V4L2_PIX_FMT_YUYV # this worked almost
fmt = V4L2_PIX_FMT_BGR24
#fmt = V4L2_PIX_FMT_RGB24
#fmt = V4L2_PIX_FMT_YVU420
print("v4l2 driver: " + capability.driver)
self.format = v4l2_format()
self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT
self.format.fmt.pix.pixelformat = fmt
self.format.fmt.pix.width = width
self.format.fmt.pix.height = height
self.format.fmt.pix.field = V4L2_FIELD_NONE
self.format.fmt.pix.bytesperline = width * 2 # was 2
self.format.fmt.pix.sizeimage = width * height * 2 # was 2
self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG
#self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB
print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format))
self.initialized = True
#Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline
#may have changed at this point
#Create image buffer
#im = misc.imread("Lenna.png") # image as array
#print "im is: "
#print type(im)
#print im
cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough")
#cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "rgb8")
# its bgr8 because the ros msg says so
#cv2.cvtColor(cv_image, cv_image, cv2.COLOR_BGR2YCR_CB)
#cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV)
#cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YCR_CB) # almooooooooost
#cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV) # very close too
#cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV_I420)
#cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV_IYUV)
#cv_image = cv2.cvtColor( cv_image, cv2.COLOR_BGR2YUV_YV12)
#buff = cv_image #.tostring()
# if not self.initialized:
# buff = np.zeros((4096,), dtype=np.uint8)
# np.append(buff, cv_image)
# else:
# buff = cv_image
buff = cv_image
#padding = 4096
#buff = np.zeros((padding, ), dtype=np.uint8)
#np.append(buff, cv_image)
#buff = buff.tostring()
# print "cv image is"
# print type(cv_image)
# print cv_image
#buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, cv_image)
#buff = np.array(cv2.imencode('.jpg', cv_image)[1]).tostring()
#buff = ConvertToYUYV(cv_image)
#buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, np_arr)
#buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, image_np)
#buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, im)
#buff = ConvertToYUYV(im)
self.device.write(buff)
#self.device.write(buff)
def insert_frame_slow_uncompressed(self):
image_np = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough")
image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
if not self.initialized:
# if type(self.last_img) == type(Image()):
#width = image_np.shape[1]
#height = image_np.shape[0]
width = self.last_img.width
height = self.last_img.height
print "image size: "
print width
print height
capability = v4l2_capability()
print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability))
print "capabilities", hex(capability.capabilities)
fmt = V4L2_PIX_FMT_YUYV # this worked almost
#fmt = V4L2_PIX_FMT_BGR24
#fmt = V4L2_PIX_FMT_RGB24
#fmt = V4L2_PIX_FMT_YVU420
print("v4l2 driver: " + capability.driver)
self.format = v4l2_format()
self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT
self.format.fmt.pix.pixelformat = fmt
self.format.fmt.pix.width = width
self.format.fmt.pix.height = height
self.format.fmt.pix.field = V4L2_FIELD_NONE
self.format.fmt.pix.bytesperline = width * 2 # was 2
self.format.fmt.pix.sizeimage = width * height * 2 # was 2
self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG
#self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB
print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format))
self.initialized = True
#Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline
#may have changed at this point
#print "convert img passthrough"
#cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough")
#cv_image.tostring()
print "convert to yuyv2"
print "format sizeiamge: " + str(self.format.fmt.pix.sizeimage)
print "format bytesperline: " + str(self.format.fmt.pix.bytesperline)
#buff = ConvertToYUYV2(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np)
buff = ConvertToYUYV2_fast(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np)
print "write to device"
self.device.write(buff)
def insert_frame_slow(self):
np_arr = np.fromstring(self.last_img.data, np.uint8)
image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
if not self.initialized:
# if type(self.last_img) == type(Image()):
width = image_np.shape[1]
height = image_np.shape[0]
#width = self.last_img.width
#height = self.last_img.height
print "image size: "
print width
print height
capability = v4l2_capability()
print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability))
print "capabilities", hex(capability.capabilities)
fmt = V4L2_PIX_FMT_YUYV # this worked almost
#fmt = V4L2_PIX_FMT_BGR24
#fmt = V4L2_PIX_FMT_RGB24
#fmt = V4L2_PIX_FMT_YVU420
print("v4l2 driver: " + capability.driver)
self.format = v4l2_format()
self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT
self.format.fmt.pix.pixelformat = fmt
self.format.fmt.pix.width = width
self.format.fmt.pix.height = height
self.format.fmt.pix.field = V4L2_FIELD_NONE
self.format.fmt.pix.bytesperline = width * 2 # was 2
self.format.fmt.pix.sizeimage = width * height * 2 # was 2
self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG
#self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB
print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format))
self.initialized = True
#Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline
#may have changed at this point
#print "convert img passthrough"
#cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough")
#cv_image.tostring()
print "convert to yuyv2"
print "format sizeiamge: " + str(self.format.fmt.pix.sizeimage)
print "format bytesperline: " + str(self.format.fmt.pix.bytesperline)
#buff = ConvertToYUYV2(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np)
buff = ConvertToYUYV2_fast(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np)
print "write to device"
self.device.write(buff)
def insert_frame_slow_bgr(self):
np_arr = np.fromstring(self.last_img.data, np.uint8)
image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2YUV_I420)
#image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2)
if not self.initialized:
# if type(self.last_img) == type(Image()):
width = image_np.shape[1]
height = image_np.shape[0]
#width = self.last_img.width
#height = self.last_img.height
print "image size: "
print width
print height
capability = v4l2_capability()
print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability))
print "capabilities", hex(capability.capabilities)
#fmt = V4L2_PIX_FMT_YUYV # this worked almost
#fmt = V4L2_PIX_FMT_BGR24
fmt = V4L2_PIX_FMT_YUV420
#fmt = V4L2_PIX_FMT_RGB24
#fmt = V4L2_PIX_FMT_YVU420
print("v4l2 driver: " + capability.driver)
self.format = v4l2_format()
self.format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT
self.format.fmt.pix.pixelformat = fmt
self.format.fmt.pix.width = width
self.format.fmt.pix.height = height
self.format.fmt.pix.field = V4L2_FIELD_NONE
self.format.fmt.pix.bytesperline = width * 2 # was 2
self.format.fmt.pix.sizeimage = width * height * 2 # was 2
self.format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG
#self.format.fmt.pix.colorspace = V4L2_COLORSPACE_SRGB
print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, self.format))
self.initialized = True
#Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline
#may have changed at this point
#print "convert img passthrough"
#cv_image = self.bridge.imgmsg_to_cv2(self.last_img, "passthrough")
#cv_image.tostring()
print "convert to yuyv2"
print "format sizeiamge: " + str(self.format.fmt.pix.sizeimage)
print "format bytesperline: " + str(self.format.fmt.pix.bytesperline)
#buff = ConvertToYUYV2(self.format.fmt.pix.sizeimage, self.format.fmt.pix.bytesperline, image_np)
buff = image_np
print "write to device"
self.device.write(buff)
def insert_lenna(self):
width = 512
height = 512
capability = v4l2_capability()
print "get capabilities result", (fcntl.ioctl(self.device, VIDIOC_QUERYCAP, capability))
print "capabilities", hex(capability.capabilities)
fmt = V4L2_PIX_FMT_YUYV
#fmt = V4L2_PIX_FMT_YVU420
print("v4l2 driver: " + capability.driver)
format = v4l2_format()
format.type = V4L2_BUF_TYPE_VIDEO_OUTPUT
format.fmt.pix.pixelformat = fmt
format.fmt.pix.width = width
format.fmt.pix.height = height
format.fmt.pix.field = V4L2_FIELD_NONE
format.fmt.pix.bytesperline = width * 2
format.fmt.pix.sizeimage = width * height * 2
format.fmt.pix.colorspace = V4L2_COLORSPACE_JPEG
print "set format result", (fcntl.ioctl(self.device, VIDIOC_S_FMT, format))
#Note that format.fmt.pix.sizeimage and format.fmt.pix.bytesperline
#may have changed at this point
#Create image buffer
im = misc.imread("Lenna.png") # image as array
buff = ConvertToYUYV2(format.fmt.pix.sizeimage, format.fmt.pix.bytesperline, im)
while True:
self.device.write(buff)
time.sleep(1./30.)
if __name__=="__main__":
rospy.init_node('imagetopictovideodevice')
vdft = VideoDeviceFromTopic()
vdft.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment