Skip to content

Instantly share code, notes, and snippets.

@CasiaFan
Last active July 17, 2024 22:46
Show Gist options
  • Save CasiaFan/4889860090b6908c39d756308a4debfa to your computer and use it in GitHub Desktop.
Save CasiaFan/4889860090b6908c39d756308a4debfa to your computer and use it in GitHub Desktop.
Asynchronous caching and analysis of video stream with opencv and multithreading
import cv2
from redis import ConnectionPool, Redis
import numpy as np
import json, time
from multithreading import Thread, Event
redis_config = {"server": "localhost",
"passwd": '',
"port": '6379',
"db": 0}
class CamStream(object):
def __init__(self, cam_addr_list, image_size=None, fps=None, use_cache=False):
"""
cam_addr_list: list of camera device addresses.
"""
self._cams = []
for cam_addr in cam_addr_list:
cap = cv2.VideoCapture(cam_addr)
if fps:
cap.set(cv2.CAP_PROP_FPS, fps)
self._cams.append(cap)
self.cam_addr_list = cam_addr_list
self.fps = fps
self._image_size = image_size
self._use_cache = use_cache
if self._use_cache:
self._redis_db = self._get_redis_conn(host=redis_config["server"],
passwd=redis_config["passwd"],
port=redis_config["port"],
db=redis_config["db"])
# self._start_caching = False # primitive variable are not passed to subprocess
self._queue_name = "video"
self._queue_length = 600
def _get_redis_conn(self, host, passwd, port, db):
# initialize the redis queue for storing image cache
pool = ConnectionPool(host=host, password=passwd, port=port, db=db)
redis_db = Redis(connection_pool=pool)
return redis_db
def start_cache(self):
# self._start_caching = True
self._start_caching = Event()
self.p = Thread(target=self._cache_image, kwargs={"cams": self._cams})
self.p.start()
return self
def _cache_image(self, cams):
"""
Cache captured image into redis queue
"""
while 1:
# if self._start_caching:
if not self._start_caching.is_set():
frames = []
for cam in cams:
ret, frame = cam.read()
if ret:
if self._image_size:
frame = cv2.resize(frame, self._image_size)
frames.append(frame)
if frames:
frames = np.stack(frames)
# info = {"frames": frames.tolist()}
info = frames.tostring()
# self._redis_db.rpush(self._queue_name, json.dumps(info))
self._redis_db.rpush(self._queue_name, info)
self._redis_db.ltrim(self._queue_name, 0, self._queue_length)
else:
time.sleep(0.5)
def stop_cache(self):
# self._start_caching = False
self._start_caching.set()
self.p.join(timeout=0.5)
return self
def check_cache(self):
ret = self._redis_db.lpop(self._queue_name)
if ret:
print("Queue exists!")
return True
else:
print("Queue dies")
return False
def delete_cache(self):
self._redis_db.delete(self._queue_name)
def capture(self):
"""Capture images from all camera and return them in batch"""
if self._use_cache:
print(self._redis_db.llen(self._queue_name))
frame_buf = self._redis_db.lpop(self._queue_name)
if frame_buf:
# frames = [np.array(x).astype(np.uint8) for x in json.loads(frame_buf)["frames"]]
frames = np.reshape(np.fromstring(frame_buf, dtype=np.uint8), (len(self._cams), self._image_size[1], self._image_size[0], 3))
frames = [x for x in frames]
else:
frames = []
else:
frames = []
for cam in self._cams:
ret, frame = cam.read()
if self._image_size:
frame = cv2.resize(frame, self._image_size)
if ret:
frames.append(frame)
if frames:
return frames
else:
print("Fail to capture frame!")
return []
def test_cam_stream():
addr = [0, 1]
cs = CamStream(addr, (640, 480), use_cache=True, fps=20)
cs.start_cache()
time.sleep(2)
while 1:
frames = cs.capture()
if frames:
cv2.imshow("test", frames[0])
key = cv2.waitKey(20)
if key == ord('q'):
cs.stop_cache()
while 1:
if cs.check_cache():
print("not empty")
else:
break
cv2.destroyAllWindows()
break
print("Done")
if __name__ == "__main__":
test_cam_stream()
@ajaysjournal
Copy link

ajaysjournal commented Jul 31, 2020

python v3.6.x
windows 10

Facing this error
python asynchronous_caching_video_stream.py

Traceback (most recent call last):
  File "asynchronous_caching_video_stream.py", line 5, in <module>
    from multithreading import Thread, Event
ModuleNotFoundError: No module named 'multithreading'

try this - from threading import Thread, Event

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment