Skip to content

Instantly share code, notes, and snippets.

@Mason-McGough
Created September 17, 2021 00:14
Show Gist options
  • Save Mason-McGough/34b55067925d07ded67a772dd50aa2d3 to your computer and use it in GitHub Desktop.
Save Mason-McGough/34b55067925d07ded67a772dd50aa2d3 to your computer and use it in GitHub Desktop.
Threaded frame grabbing and display for generic webcams or Intel RealSense camera
import time
import threading
import numpy as np
import cv2 as cv
def _is_window_open(win_name):
return cv.getWindowProperty(win_name, cv.WND_PROP_VISIBLE) > 0
class VideoGetter():
"""
Read frames from a video capture device in a dedicated thread.
"""
def __init__(self, src=0, timeout=100000):
"""
Args:
src (int|str): Video source. Int if webcam id, str if path to file.
"""
self.cap = cv.VideoCapture(src)
# self.grabbed, self.frame = self.cap.read()
cnt = 0
self.grabbed = self.grab_frame()
while not self.grabbed:
cnt += 1
if cnt > timeout:
raise TimeoutError(f'Failed to initialize camera {src}')
self.stopped = False
def grab_frame(self):
grabbed, self.frame = self.cap.read()
return grabbed
def start(self):
threading.Thread(target=self.get, args=()).start()
return self
def get(self):
"""
Method called in a thread to continually read frames from `self.cap`.
This way, a frame is always ready to be read. Frames are not queued;
if a frame is not read before `get()` reads a new frame, previous
frame is overwritten.
"""
while not self.stopped:
if not self.grabbed:
self.stop()
else:
self.grabbed = self.grab_frame()
def stop(self):
self.stopped = True
def close(self):
if not self.stopped:
self.stop()
self.cap.release()
class RealSenseVideoGetter(VideoGetter):
"""
Read frames from Intel RealSense sensor in a dedicated thread.
"""
def __init__(self, color_depth=False, fps=30):
"""
Args:
src (int|str): Video source. Int if webcam id, str if path to file.
"""
import pyrealsense2 as rs
self.color_depth = color_depth
self.fps = fps
self.init_stream()
if color_depth:
self.colorizer = rs.colorizer()
self.point_cloud_block = rs.pointcloud()
self.grabbed = self.grab_frame()
self.stopped = False
def init_stream(self):
# configure depth and color streams
self.pipeline = rs.pipeline()
self.config = rs.config()
# get device product line for setting a supporting resolution
pipeline_wrapper = rs.pipeline_wrapper(self.pipeline)
pipeline_profile = self.config.resolve(pipeline_wrapper)
device = pipeline_profile.get_device()
device_product_line = str(device.get_info(rs.camera_info.product_line))
# enable video streams
self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16,
framerate=self.fps)
if device_product_line == 'L500':
self.config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8,
framerate=self.fps)
else:
self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8,
framerate=self.fps)
# start streaming
self.pipeline.start(self.config)
def grab_frame(self):
# wait for a coherent pair of frames: depth and color
depth_frame = None
color_frame = None
while not depth_frame or not color_frame:
frameset = self.pipeline.wait_for_frames()
# undistort and align depth image to color
align = rs.align(rs.stream.color)
frameset = align.process(frameset)
# gather frames
depth_frame = frameset.get_depth_frame()
color_frame = frameset.get_color_frame()
self.depth_intrinsics = rs.video_stream_profile(
depth_frame.profile).get_intrinsics()
# convert images to numpy arrays
self.depth_image = np.asanyarray(depth_frame.get_data())
self.color_image = np.asanyarray(color_frame.get_data())
if self.color_depth:
self.colored_depth_image = np.asanyarray(
self.colorizer.colorize(depth_frame).get_data())
return True
def start(self):
threading.Thread(target=self.get, args=()).start()
return self
def get(self):
"""
Method called in a thread to continually read frames from `self.cap`.
This way, a frame is always ready to be read. Frames are not queued;
if a frame is not read before `get()` reads a new frame, previous
frame is overwritten.
"""
while not self.stopped:
if not self.grabbed:
self.stop()
else:
self.grabbed = self.grab_frame()
def stop(self):
self.stopped = True
class VideoShower():
"""
Class to show frames in a dedicated thread.
"""
def __init__(self, frames=[], win_names=[], wait=1):
"""
Args:
frame (np.ndarray): (Initial) frame to display.
win_name (str): Name of `cv2.imshow()` window.
"""
self.frames = frames
self.win_names = win_names
self.wait = wait
self.stopped = True
def start(self):
# cv.namedWindow(self.win_name)
self.stopped = False
threading.Thread(target=self.show, args=()).start()
return self
def show(self):
"""
Method called within thread to show new frames.
"""
while not self.stopped:
for frame, win_name in zip(self.frames, self.win_names):
# We can actually see an ~8% increase in FPS by only calling
# cv2.imshow when a new frame is set with an if statement. Thus,
# set `self.frame` to None after each call to `cv2.imshow()`.
if frame is not None:
cv.imshow(win_name, frame)
frame = None
if cv.waitKey(self.wait) == ord("q"):
self.stop()
def stop(self):
if not self.stopped:
self.stopped = True
for win_name in self.win_names:
cv.destroyWindow(win_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment