Skip to content

Instantly share code, notes, and snippets.

@syehoonkim
Created July 18, 2024 04:45
Show Gist options
  • Select an option

  • Save syehoonkim/b4ddce9a73ed1a0299cf97593f13a4a9 to your computer and use it in GitHub Desktop.

Select an option

Save syehoonkim/b4ddce9a73ed1a0299cf97593f13a4a9 to your computer and use it in GitHub Desktop.
Video Ringbuffer
import cv2
import glob
from time import time
import os
import skvideo.io
cap = cv2.VideoCapture("rtsp://210.216.76.121:554/sbsonair0", cv2.CAP_FFMPEG)
class SplitWriter:
def __init__(
self,
split_size=10,
directory="c:\\temp",
split_history=5,
split_prefix="split",
compressed=True,
fps=30,
):
self.split_size = split_size
self.directory = directory
self.split_history = split_history
self.fps = fps
self.compressed = compressed
self.split_prefix = split_prefix
self.current_split = 0
self.new_split = 0
self.writer = None
self.last_frame_delay = 0
self.remote_frameno = 0
self.frameno = 0
def _gen_split_name(self):
return os.path.join(
self.directory,
self.split_prefix + ".%d.%d.mp4" % (self.current_split, self.split_size),
)
def _start_new_split(self, frame):
self.current_split = int(time())
self.new_split = self.current_split + self.split_size
if self.writer:
# self.writer.release()
self.writer.close()
# self.writer = cv2.VideoWriter(
# self._gen_split_name(),
# cv2.VideoWriter_fourcc("a", "v", "c", "1"),
# self.fps,
# (frame.shape[1], frame.shape[0]),
# )
self.writer = skvideo.io.FFmpegWriter(
self._gen_split_name(), outputdict={"-vcodec": "h264_nvenc"}
)
self._clear_old_splits()
def write(self):
ret, frame = cap.read()
now = time()
self.frameno += 1
if now > self.new_split:
self._start_new_split(frame)
# self.writer.write(frame)
self.writer.writeFrame(frame)
cv2.imshow("frames", frame)
cv2.waitKey(1)
def _clear_old_splits(self):
for f in glob.glob(
os.path.join(self.directory, self.split_prefix + ".*.*.mp4")
):
parts = f.split(".")
ts = int(parts[-3])
if ts < time() - self.split_size * self.split_history:
print("--", f)
os.unlink(f)
def release(self):
self.writer.release()
if __name__ == "__main__":
w = SplitWriter(split_history=5, split_size=10)
while True:
w.write()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment