Skip to content

Instantly share code, notes, and snippets.

@BrianPugh
Forked from Eugeny/readme.md
Last active November 28, 2022 16:50
Show Gist options
  • Save BrianPugh/8de1a6f124516ec2a2a61caadcdf039a to your computer and use it in GitHub Desktop.
Save BrianPugh/8de1a6f124516ec2a2a61caadcdf039a to your computer and use it in GitHub Desktop.
Frame accurate video reader - OpenCV VideoCapture replacement

OpenCV's VideoCapture is broken and hasn't been fixed for the last 5 years: opencv/opencv#9053

This is a PyAV based replacement. Unlike other implementations it can seek at any time.

Install PyAV:

pip install av

How to use:

reader = VideoReader('video.mp4')
reader.seek(len(reader) - 100)  # frame number 
while True:
  frame = reader.read()
  if not frame:
    break
    
  # frame is an ndarray - do something with it
  print(f'frame {reader.position}: {frame}')
  
reader.close()
import av
import itertools
import cv2
class VideoReader:
ROTATE_LOOKUP = {
90: cv2.ROTATE_90_CLOCKWISE,
180: cv2.ROTATE_180,
270: cv2.ROTATE_90_COUNTERCLOCKWISE,
}
def __init__(self, path, max_seek_search=500):
self.max_seek_search = max_seek_search
self.container = av.container.open(str(path))
self.position = 0
self.stream = self.container.streams.video[0]
self.seek(0)
def __len__(self):
return self.stream.frames
def iter_frames(self):
for packet in self.container.demux(self.stream):
for frame in packet.decode():
yield frame
def release(self):
self.container.close()
def read(self):
try:
frame = next(self.iter)
except StopIteration:
self.end = True
return None
self.position += 1
ndarray = frame.to_rgb().to_ndarray()
# Currently, pyav doesn't respect the rotate metadata
rotate = int(self.stream.metadata.get("rotate", 0))
if rotate:
try:
rotate_code = self.ROTATE_LOOKUP[rotate]
except KeyError:
raise NotImplementedError(f"Rotation {rotate} not implemented.")
ndarray = cv2.rotate(ndarray, rotate_code)
return ndarray
def seek(self, frame):
pts = int(frame * self.stream.duration / self.stream.frames)
self.container.seek(pts, stream=self.stream)
self.iter = self.iter_frames()
for j, f in enumerate(self.iter):
if j > self.max_seek_search:
raise RuntimeError(
f"Did not find target within {self.max_seek_search} "
"frames of seek."
)
if f.pts >= pts - 1:
self.end = False
self.position = frame
self.iter = itertools.chain([f], self.iter)
return
self.end = True
self.position = -1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment