Last active
July 17, 2024 22:46
-
-
Save CasiaFan/4889860090b6908c39d756308a4debfa to your computer and use it in GitHub Desktop.
Asynchronous caching and analysis of video stream with opencv and multithreading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
from redis import ConnectionPool, Redis | |
import numpy as np | |
import json, time | |
from multithreading import Thread, Event | |
redis_config = {"server": "localhost", | |
"passwd": '', | |
"port": '6379', | |
"db": 0} | |
class CamStream(object): | |
def __init__(self, cam_addr_list, image_size=None, fps=None, use_cache=False): | |
""" | |
cam_addr_list: list of camera device addresses. | |
""" | |
self._cams = [] | |
for cam_addr in cam_addr_list: | |
cap = cv2.VideoCapture(cam_addr) | |
if fps: | |
cap.set(cv2.CAP_PROP_FPS, fps) | |
self._cams.append(cap) | |
self.cam_addr_list = cam_addr_list | |
self.fps = fps | |
self._image_size = image_size | |
self._use_cache = use_cache | |
if self._use_cache: | |
self._redis_db = self._get_redis_conn(host=redis_config["server"], | |
passwd=redis_config["passwd"], | |
port=redis_config["port"], | |
db=redis_config["db"]) | |
# self._start_caching = False # primitive variable are not passed to subprocess | |
self._queue_name = "video" | |
self._queue_length = 600 | |
def _get_redis_conn(self, host, passwd, port, db): | |
# initialize the redis queue for storing image cache | |
pool = ConnectionPool(host=host, password=passwd, port=port, db=db) | |
redis_db = Redis(connection_pool=pool) | |
return redis_db | |
def start_cache(self): | |
# self._start_caching = True | |
self._start_caching = Event() | |
self.p = Thread(target=self._cache_image, kwargs={"cams": self._cams}) | |
self.p.start() | |
return self | |
def _cache_image(self, cams): | |
""" | |
Cache captured image into redis queue | |
""" | |
while 1: | |
# if self._start_caching: | |
if not self._start_caching.is_set(): | |
frames = [] | |
for cam in cams: | |
ret, frame = cam.read() | |
if ret: | |
if self._image_size: | |
frame = cv2.resize(frame, self._image_size) | |
frames.append(frame) | |
if frames: | |
frames = np.stack(frames) | |
# info = {"frames": frames.tolist()} | |
info = frames.tostring() | |
# self._redis_db.rpush(self._queue_name, json.dumps(info)) | |
self._redis_db.rpush(self._queue_name, info) | |
self._redis_db.ltrim(self._queue_name, 0, self._queue_length) | |
else: | |
time.sleep(0.5) | |
def stop_cache(self): | |
# self._start_caching = False | |
self._start_caching.set() | |
self.p.join(timeout=0.5) | |
return self | |
def check_cache(self): | |
ret = self._redis_db.lpop(self._queue_name) | |
if ret: | |
print("Queue exists!") | |
return True | |
else: | |
print("Queue dies") | |
return False | |
def delete_cache(self): | |
self._redis_db.delete(self._queue_name) | |
def capture(self): | |
"""Capture images from all camera and return them in batch""" | |
if self._use_cache: | |
print(self._redis_db.llen(self._queue_name)) | |
frame_buf = self._redis_db.lpop(self._queue_name) | |
if frame_buf: | |
# frames = [np.array(x).astype(np.uint8) for x in json.loads(frame_buf)["frames"]] | |
frames = np.reshape(np.fromstring(frame_buf, dtype=np.uint8), (len(self._cams), self._image_size[1], self._image_size[0], 3)) | |
frames = [x for x in frames] | |
else: | |
frames = [] | |
else: | |
frames = [] | |
for cam in self._cams: | |
ret, frame = cam.read() | |
if self._image_size: | |
frame = cv2.resize(frame, self._image_size) | |
if ret: | |
frames.append(frame) | |
if frames: | |
return frames | |
else: | |
print("Fail to capture frame!") | |
return [] | |
def test_cam_stream(): | |
addr = [0, 1] | |
cs = CamStream(addr, (640, 480), use_cache=True, fps=20) | |
cs.start_cache() | |
time.sleep(2) | |
while 1: | |
frames = cs.capture() | |
if frames: | |
cv2.imshow("test", frames[0]) | |
key = cv2.waitKey(20) | |
if key == ord('q'): | |
cs.stop_cache() | |
while 1: | |
if cs.check_cache(): | |
print("not empty") | |
else: | |
break | |
cv2.destroyAllWindows() | |
break | |
print("Done") | |
if __name__ == "__main__": | |
test_cam_stream() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
try this -
from threading import Thread, Event