Created
April 13, 2020 16:20
-
-
Save The0x539/698c809d1f56c389dd359a0dbfbb3476 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import vapoursynth as vs | |
#class vs: | |
# VideoNode = list | |
from enum import Enum, auto | |
from typing import Callable, Sequence, Union | |
from dataclasses import dataclass | |
class SegType(Enum): | |
UNTOUCHED = auto() | |
REPEATED = auto() | |
OMITTED = auto() | |
INSERTED = auto() | |
@dataclass | |
class Segment: | |
seg_type: SegType | |
start: int | |
stop: int | |
data: Union[vs.VideoNode, int, None] = None | |
def _assert_untouched(self, action: str): | |
if self.seg_type is not SegType.UNTOUCHED: | |
raise TypeError(f'[{self.start}:{self.stop}] is already {self.seg_type}; can\'t {action}') | |
def cut(self, start: int, stop: int): | |
self._assert_untouched('cut') | |
if start < self.start or stop > self.stop: | |
raise ValueError(f'[{start}:{stop}] not entirely inside [{self.start}:{self.stop}]') | |
return (Segment(SegType.UNTOUCHED, self.start, start), | |
Segment(SegType.UNTOUCHED, start, stop), | |
Segment(SegType.UNTOUCHED, stop, self.stop)) | |
def __mul__(self, amount: int): | |
if amount == 0: | |
self._assert_untouched('omit') | |
return Segment(SegType.OMITTED, self.start, self.stop) | |
elif amount == 1: | |
return self | |
self._assert_untouched('repeat') | |
return Segment(SegType.REPEATED, self.start, self.stop, amount) | |
def realize(self, clip: vs.VideoNode) -> vs.VideoNode: | |
if self.seg_type is SegType.OMITTED: | |
return [] | |
elif self.seg_type is SegType.UNTOUCHED: | |
return clip[self.start:self.stop] | |
elif self.seg_type is SegType.REPEATED: | |
return clip[self.start:self.stop] * self.data | |
elif self.seg_type is SegType.INSERTED: | |
return self.data | |
class ClipMods: | |
def __init__(self, n: int): | |
self._length = n | |
self.segments = [Segment(SegType.UNTOUCHED, 0, n)] | |
def __len__(self) -> int: | |
return self._length | |
def _normalize_idx(self, idx: int) -> int: | |
old_idx = idx | |
if idx < 0: | |
idx += len(self) | |
if idx < 0 or idx > len(self): | |
raise IndexError(f'{old_idx} is outside of clip ([0:{len(self)})') | |
return idx | |
def _normalize_slice(self, idx: Union[int, slice]) -> slice: | |
if isinstance(idx, int): | |
start = idx | |
stop = idx + 1 | |
elif isinstance(idx, slice): | |
if idx.step is not None: | |
raise NotImplementedError(f'indexing of ClipMods using steps is not implemented') | |
start = idx.start or 0 | |
stop = idx.stop or -1 | |
else: | |
raise TypeError(f'tried to index a ClipMods object with {repr(idx)}') | |
start = self._normalize_idx(start) | |
stop = self._normalize_idx(stop) | |
return slice(start, stop, None) | |
def _ensure_segment(self, frames: slice): | |
for idx, seg in enumerate(self.segments): | |
if seg.start == frames.start and frames.stop == seg.stop: | |
return | |
elif seg.start <= frames.start and frames.stop <= seg.stop: | |
self.segments[idx:idx+1] = seg.cut(frames.start, frames.stop) | |
return | |
else: | |
raise IndexError(f'Couldn\'t find a segment encompassing [{frames.start}:{frames.stop}]') | |
def _get_idx(self, frames: slice) -> int: | |
for idx, seg in enumerate(self.segments): | |
if seg.start == frames.start and frames.stop == seg.stop: | |
return idx | |
else: | |
raise IndexError(f'No segment found for [{frames.start}:{frames.stop}]') | |
def __getitem__(self, idx: Union[int, slice]) -> Segment: | |
frames = self._normalize_slice(idx) | |
self._ensure_segment(frames) | |
idx = self._get_idx(frames) | |
return self.segments[idx] | |
def __setitem__(self, idx: Union[int, slice], val: Segment): | |
frames = self._normalize_slice(idx) | |
self._ensure_segment(frames) | |
idx = self._get_idx(frames) | |
self.segments[idx] = val | |
def __delitem__(self, idx: Union[int, slice]): | |
self[idx] *= 0 | |
def insert(self, frame: int, clip: vs.VideoNode): | |
self[frame:frame] = Segment(SegType.INSERTED, frame, frame, clip) | |
def validate(self): | |
if self.segments[0].start != 0: | |
raise RuntimeError(f'First segment starts at frame {self.segments[0].start} (should be 0)') | |
elif self.segments[-1].stop != len(self): | |
raise RuntimeError(f'Last segment ends at frame {self.segments[-1].stop} (should be {len(self)})') | |
for i in range(len(self.segments) - 1): | |
seg1, seg2 = self.segments[i:i+2] | |
if seg1.stop != seg2.start: | |
raise RuntimeError(f'Segments {i}, {i+1} are non-adjacent: [{seg1.start}:{seg1.stop}], [{seg2.start}:{seg2.stop}]') | |
for seg in self.segments: | |
if seg.stop == seg.start and seg.seg_type is not SegType.INSERTED: | |
raise RuntimeError(f'Invalid segment: {seg}') | |
def black_magic(func: Callable[[ClipMods], None]) -> Callable[[vs.VideoNode], vs.VideoNode]: | |
def ret_func(clip: vs.VideoNode) -> vs.VideoNode: | |
mods = ClipMods(len(clip)) | |
func(mods) | |
mods.validate() | |
return sum((segment.realize(clip) for segment in mods.segments), []) | |
return ret_func | |
#inp = list(range(50)) | |
# | |
#@black_magic | |
#def example(clip): | |
# del clip[48] | |
# del clip[2] | |
# clip[10] *= 3 | |
# del clip[15:17] | |
# clip[20:23] *= 2 | |
# clip.insert(33, [1283, 59872]) | |
# | |
#outp = example(inp) | |
#print(outp) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment