Skip to content

Instantly share code, notes, and snippets.

@The0x539
Created April 13, 2020 16:20
Show Gist options
  • Save The0x539/698c809d1f56c389dd359a0dbfbb3476 to your computer and use it in GitHub Desktop.
Save The0x539/698c809d1f56c389dd359a0dbfbb3476 to your computer and use it in GitHub Desktop.
import vapoursynth as vs
#class vs:
# VideoNode = list
from enum import Enum, auto
from typing import Callable, Sequence, Union
from dataclasses import dataclass
class SegType(Enum):
UNTOUCHED = auto()
REPEATED = auto()
OMITTED = auto()
INSERTED = auto()
@dataclass
class Segment:
seg_type: SegType
start: int
stop: int
data: Union[vs.VideoNode, int, None] = None
def _assert_untouched(self, action: str):
if self.seg_type is not SegType.UNTOUCHED:
raise TypeError(f'[{self.start}:{self.stop}] is already {self.seg_type}; can\'t {action}')
def cut(self, start: int, stop: int):
self._assert_untouched('cut')
if start < self.start or stop > self.stop:
raise ValueError(f'[{start}:{stop}] not entirely inside [{self.start}:{self.stop}]')
return (Segment(SegType.UNTOUCHED, self.start, start),
Segment(SegType.UNTOUCHED, start, stop),
Segment(SegType.UNTOUCHED, stop, self.stop))
def __mul__(self, amount: int):
if amount == 0:
self._assert_untouched('omit')
return Segment(SegType.OMITTED, self.start, self.stop)
elif amount == 1:
return self
self._assert_untouched('repeat')
return Segment(SegType.REPEATED, self.start, self.stop, amount)
def realize(self, clip: vs.VideoNode) -> vs.VideoNode:
if self.seg_type is SegType.OMITTED:
return []
elif self.seg_type is SegType.UNTOUCHED:
return clip[self.start:self.stop]
elif self.seg_type is SegType.REPEATED:
return clip[self.start:self.stop] * self.data
elif self.seg_type is SegType.INSERTED:
return self.data
class ClipMods:
def __init__(self, n: int):
self._length = n
self.segments = [Segment(SegType.UNTOUCHED, 0, n)]
def __len__(self) -> int:
return self._length
def _normalize_idx(self, idx: int) -> int:
old_idx = idx
if idx < 0:
idx += len(self)
if idx < 0 or idx > len(self):
raise IndexError(f'{old_idx} is outside of clip ([0:{len(self)})')
return idx
def _normalize_slice(self, idx: Union[int, slice]) -> slice:
if isinstance(idx, int):
start = idx
stop = idx + 1
elif isinstance(idx, slice):
if idx.step is not None:
raise NotImplementedError(f'indexing of ClipMods using steps is not implemented')
start = idx.start or 0
stop = idx.stop or -1
else:
raise TypeError(f'tried to index a ClipMods object with {repr(idx)}')
start = self._normalize_idx(start)
stop = self._normalize_idx(stop)
return slice(start, stop, None)
def _ensure_segment(self, frames: slice):
for idx, seg in enumerate(self.segments):
if seg.start == frames.start and frames.stop == seg.stop:
return
elif seg.start <= frames.start and frames.stop <= seg.stop:
self.segments[idx:idx+1] = seg.cut(frames.start, frames.stop)
return
else:
raise IndexError(f'Couldn\'t find a segment encompassing [{frames.start}:{frames.stop}]')
def _get_idx(self, frames: slice) -> int:
for idx, seg in enumerate(self.segments):
if seg.start == frames.start and frames.stop == seg.stop:
return idx
else:
raise IndexError(f'No segment found for [{frames.start}:{frames.stop}]')
def __getitem__(self, idx: Union[int, slice]) -> Segment:
frames = self._normalize_slice(idx)
self._ensure_segment(frames)
idx = self._get_idx(frames)
return self.segments[idx]
def __setitem__(self, idx: Union[int, slice], val: Segment):
frames = self._normalize_slice(idx)
self._ensure_segment(frames)
idx = self._get_idx(frames)
self.segments[idx] = val
def __delitem__(self, idx: Union[int, slice]):
self[idx] *= 0
def insert(self, frame: int, clip: vs.VideoNode):
self[frame:frame] = Segment(SegType.INSERTED, frame, frame, clip)
def validate(self):
if self.segments[0].start != 0:
raise RuntimeError(f'First segment starts at frame {self.segments[0].start} (should be 0)')
elif self.segments[-1].stop != len(self):
raise RuntimeError(f'Last segment ends at frame {self.segments[-1].stop} (should be {len(self)})')
for i in range(len(self.segments) - 1):
seg1, seg2 = self.segments[i:i+2]
if seg1.stop != seg2.start:
raise RuntimeError(f'Segments {i}, {i+1} are non-adjacent: [{seg1.start}:{seg1.stop}], [{seg2.start}:{seg2.stop}]')
for seg in self.segments:
if seg.stop == seg.start and seg.seg_type is not SegType.INSERTED:
raise RuntimeError(f'Invalid segment: {seg}')
def black_magic(func: Callable[[ClipMods], None]) -> Callable[[vs.VideoNode], vs.VideoNode]:
def ret_func(clip: vs.VideoNode) -> vs.VideoNode:
mods = ClipMods(len(clip))
func(mods)
mods.validate()
return sum((segment.realize(clip) for segment in mods.segments), [])
return ret_func
#inp = list(range(50))
#
#@black_magic
#def example(clip):
# del clip[48]
# del clip[2]
# clip[10] *= 3
# del clip[15:17]
# clip[20:23] *= 2
# clip.insert(33, [1283, 59872])
#
#outp = example(inp)
#print(outp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment