Created
September 17, 2021 00:14
-
-
Save Mason-McGough/34b55067925d07ded67a772dd50aa2d3 to your computer and use it in GitHub Desktop.
Threaded frame grabbing and display for generic webcams or Intel RealSense camera
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import threading | |
import numpy as np | |
import cv2 as cv | |
def _is_window_open(win_name): | |
return cv.getWindowProperty(win_name, cv.WND_PROP_VISIBLE) > 0 | |
class VideoGetter(): | |
""" | |
Read frames from a video capture device in a dedicated thread. | |
""" | |
def __init__(self, src=0, timeout=100000): | |
""" | |
Args: | |
src (int|str): Video source. Int if webcam id, str if path to file. | |
""" | |
self.cap = cv.VideoCapture(src) | |
# self.grabbed, self.frame = self.cap.read() | |
cnt = 0 | |
self.grabbed = self.grab_frame() | |
while not self.grabbed: | |
cnt += 1 | |
if cnt > timeout: | |
raise TimeoutError(f'Failed to initialize camera {src}') | |
self.stopped = False | |
def grab_frame(self): | |
grabbed, self.frame = self.cap.read() | |
return grabbed | |
def start(self): | |
threading.Thread(target=self.get, args=()).start() | |
return self | |
def get(self): | |
""" | |
Method called in a thread to continually read frames from `self.cap`. | |
This way, a frame is always ready to be read. Frames are not queued; | |
if a frame is not read before `get()` reads a new frame, previous | |
frame is overwritten. | |
""" | |
while not self.stopped: | |
if not self.grabbed: | |
self.stop() | |
else: | |
self.grabbed = self.grab_frame() | |
def stop(self): | |
self.stopped = True | |
def close(self): | |
if not self.stopped: | |
self.stop() | |
self.cap.release() | |
class RealSenseVideoGetter(VideoGetter): | |
""" | |
Read frames from Intel RealSense sensor in a dedicated thread. | |
""" | |
def __init__(self, color_depth=False, fps=30): | |
""" | |
Args: | |
src (int|str): Video source. Int if webcam id, str if path to file. | |
""" | |
import pyrealsense2 as rs | |
self.color_depth = color_depth | |
self.fps = fps | |
self.init_stream() | |
if color_depth: | |
self.colorizer = rs.colorizer() | |
self.point_cloud_block = rs.pointcloud() | |
self.grabbed = self.grab_frame() | |
self.stopped = False | |
def init_stream(self): | |
# configure depth and color streams | |
self.pipeline = rs.pipeline() | |
self.config = rs.config() | |
# get device product line for setting a supporting resolution | |
pipeline_wrapper = rs.pipeline_wrapper(self.pipeline) | |
pipeline_profile = self.config.resolve(pipeline_wrapper) | |
device = pipeline_profile.get_device() | |
device_product_line = str(device.get_info(rs.camera_info.product_line)) | |
# enable video streams | |
self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, | |
framerate=self.fps) | |
if device_product_line == 'L500': | |
self.config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, | |
framerate=self.fps) | |
else: | |
self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, | |
framerate=self.fps) | |
# start streaming | |
self.pipeline.start(self.config) | |
def grab_frame(self): | |
# wait for a coherent pair of frames: depth and color | |
depth_frame = None | |
color_frame = None | |
while not depth_frame or not color_frame: | |
frameset = self.pipeline.wait_for_frames() | |
# undistort and align depth image to color | |
align = rs.align(rs.stream.color) | |
frameset = align.process(frameset) | |
# gather frames | |
depth_frame = frameset.get_depth_frame() | |
color_frame = frameset.get_color_frame() | |
self.depth_intrinsics = rs.video_stream_profile( | |
depth_frame.profile).get_intrinsics() | |
# convert images to numpy arrays | |
self.depth_image = np.asanyarray(depth_frame.get_data()) | |
self.color_image = np.asanyarray(color_frame.get_data()) | |
if self.color_depth: | |
self.colored_depth_image = np.asanyarray( | |
self.colorizer.colorize(depth_frame).get_data()) | |
return True | |
def start(self): | |
threading.Thread(target=self.get, args=()).start() | |
return self | |
def get(self): | |
""" | |
Method called in a thread to continually read frames from `self.cap`. | |
This way, a frame is always ready to be read. Frames are not queued; | |
if a frame is not read before `get()` reads a new frame, previous | |
frame is overwritten. | |
""" | |
while not self.stopped: | |
if not self.grabbed: | |
self.stop() | |
else: | |
self.grabbed = self.grab_frame() | |
def stop(self): | |
self.stopped = True | |
class VideoShower(): | |
""" | |
Class to show frames in a dedicated thread. | |
""" | |
def __init__(self, frames=[], win_names=[], wait=1): | |
""" | |
Args: | |
frame (np.ndarray): (Initial) frame to display. | |
win_name (str): Name of `cv2.imshow()` window. | |
""" | |
self.frames = frames | |
self.win_names = win_names | |
self.wait = wait | |
self.stopped = True | |
def start(self): | |
# cv.namedWindow(self.win_name) | |
self.stopped = False | |
threading.Thread(target=self.show, args=()).start() | |
return self | |
def show(self): | |
""" | |
Method called within thread to show new frames. | |
""" | |
while not self.stopped: | |
for frame, win_name in zip(self.frames, self.win_names): | |
# We can actually see an ~8% increase in FPS by only calling | |
# cv2.imshow when a new frame is set with an if statement. Thus, | |
# set `self.frame` to None after each call to `cv2.imshow()`. | |
if frame is not None: | |
cv.imshow(win_name, frame) | |
frame = None | |
if cv.waitKey(self.wait) == ord("q"): | |
self.stop() | |
def stop(self): | |
if not self.stopped: | |
self.stopped = True | |
for win_name in self.win_names: | |
cv.destroyWindow(win_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment