Last active
July 17, 2024 22:46
-
-
Save CasiaFan/4889860090b6908c39d756308a4debfa to your computer and use it in GitHub Desktop.
Asynchronous caching and analysis of video stream with opencv and multithreading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
from redis import ConnectionPool, Redis | |
import numpy as np | |
import json, time | |
from multithreading import Thread, Event | |
redis_config = {"server": "localhost", | |
"passwd": '', | |
"port": '6379', | |
"db": 0} | |
class CamStream(object): | |
def __init__(self, cam_addr_list, image_size=None, fps=None, use_cache=False): | |
""" | |
cam_addr_list: list of camera device addresses. | |
""" | |
self._cams = [] | |
for cam_addr in cam_addr_list: | |
cap = cv2.VideoCapture(cam_addr) | |
if fps: | |
cap.set(cv2.CAP_PROP_FPS, fps) | |
self._cams.append(cap) | |
self.cam_addr_list = cam_addr_list | |
self.fps = fps | |
self._image_size = image_size | |
self._use_cache = use_cache | |
if self._use_cache: | |
self._redis_db = self._get_redis_conn(host=redis_config["server"], | |
passwd=redis_config["passwd"], | |
port=redis_config["port"], | |
db=redis_config["db"]) | |
# self._start_caching = False # primitive variable are not passed to subprocess | |
self._queue_name = "video" | |
self._queue_length = 600 | |
def _get_redis_conn(self, host, passwd, port, db): | |
# initialize the redis queue for storing image cache | |
pool = ConnectionPool(host=host, password=passwd, port=port, db=db) | |
redis_db = Redis(connection_pool=pool) | |
return redis_db | |
def start_cache(self): | |
# self._start_caching = True | |
self._start_caching = Event() | |
self.p = Thread(target=self._cache_image, kwargs={"cams": self._cams}) | |
self.p.start() | |
return self | |
def _cache_image(self, cams): | |
""" | |
Cache captured image into redis queue | |
""" | |
while 1: | |
# if self._start_caching: | |
if not self._start_caching.is_set(): | |
frames = [] | |
for cam in cams: | |
ret, frame = cam.read() | |
if ret: | |
if self._image_size: | |
frame = cv2.resize(frame, self._image_size) | |
frames.append(frame) | |
if frames: | |
frames = np.stack(frames) | |
# info = {"frames": frames.tolist()} | |
info = frames.tostring() | |
# self._redis_db.rpush(self._queue_name, json.dumps(info)) | |
self._redis_db.rpush(self._queue_name, info) | |
self._redis_db.ltrim(self._queue_name, 0, self._queue_length) | |
else: | |
time.sleep(0.5) | |
def stop_cache(self): | |
# self._start_caching = False | |
self._start_caching.set() | |
self.p.join(timeout=0.5) | |
return self | |
def check_cache(self): | |
ret = self._redis_db.lpop(self._queue_name) | |
if ret: | |
print("Queue exists!") | |
return True | |
else: | |
print("Queue dies") | |
return False | |
def delete_cache(self): | |
self._redis_db.delete(self._queue_name) | |
def capture(self): | |
"""Capture images from all camera and return them in batch""" | |
if self._use_cache: | |
print(self._redis_db.llen(self._queue_name)) | |
frame_buf = self._redis_db.lpop(self._queue_name) | |
if frame_buf: | |
# frames = [np.array(x).astype(np.uint8) for x in json.loads(frame_buf)["frames"]] | |
frames = np.reshape(np.fromstring(frame_buf, dtype=np.uint8), (len(self._cams), self._image_size[1], self._image_size[0], 3)) | |
frames = [x for x in frames] | |
else: | |
frames = [] | |
else: | |
frames = [] | |
for cam in self._cams: | |
ret, frame = cam.read() | |
if self._image_size: | |
frame = cv2.resize(frame, self._image_size) | |
if ret: | |
frames.append(frame) | |
if frames: | |
return frames | |
else: | |
print("Fail to capture frame!") | |
return [] | |
def test_cam_stream(): | |
addr = [0, 1] | |
cs = CamStream(addr, (640, 480), use_cache=True, fps=20) | |
cs.start_cache() | |
time.sleep(2) | |
while 1: | |
frames = cs.capture() | |
if frames: | |
cv2.imshow("test", frames[0]) | |
key = cv2.waitKey(20) | |
if key == ord('q'): | |
cs.stop_cache() | |
while 1: | |
if cs.check_cache(): | |
print("not empty") | |
else: | |
break | |
cv2.destroyAllWindows() | |
break | |
print("Done") | |
if __name__ == "__main__": | |
test_cam_stream() |
python v3.6.x
windows 10Facing this error
python asynchronous_caching_video_stream.pyTraceback (most recent call last): File "asynchronous_caching_video_stream.py", line 5, in <module> from multithreading import Thread, Event ModuleNotFoundError: No module named 'multithreading'
try this - from threading import Thread, Event
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
python v3.6.x
windows 10
Facing this error
python asynchronous_caching_video_stream.py