-
-
Save kageru/0e4b4f0e8b443c59552b66d5f8b55d93 to your computer and use it in GitHub Desktop.
Snap start and end times of ASS subtitles to scene changes using WWXD
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.6 | |
"""Snap start and end times of ASS subtitles to scene changes using WWXD. | |
Does not work with variable frame rate (VFR). | |
usage: snap_scenechanges.py [-h] [--epsilon EPSILON] [-o OUTPUT] [-v] | |
sub_path video_path | |
positional arguments: | |
sub_path Path to subtitle file. | |
video_path Path to video file. | |
optional arguments: | |
-h, --help show this help message and exit | |
--epsilon EPSILON Number of frames to search for scene changes around | |
each frame. | |
-o OUTPUT, --output OUTPUT | |
Output path. By default, the input path with | |
`_snapped` appended. | |
-v, --verbose Increase verbosity. | |
Requires: | |
- Vapoursynth | |
- Vapoursynth WWXD plugin (https://github.com/dubhater/vapoursynth-wwxd) | |
- lsmas | |
- python-ass (https://github.com/rfw/python-ass) | |
""" | |
# Copyright 2017 FichteFoll <[email protected]> | |
# | |
# Permission to use, copy, modify, and/or distribute this software for any | |
# purpose with or without fee is hereby granted, provided that the above | |
# copyright notice and this permission notice appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY | |
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION | |
# OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN | |
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
# | |
# Slight modifications: [email protected] | |
import argparse | |
from collections import defaultdict | |
from itertools import cycle, islice | |
import logging | |
import math | |
from pathlib import Path | |
import sys | |
from datetime import timedelta | |
from typing import List, Optional, Iterable | |
from multiprocessing import Pool | |
import ass | |
import vapoursynth | |
l = logging.getLogger(__name__) | |
core = vapoursynth.get_core() | |
parser = argparse.ArgumentParser(description="Snap start and end times of subtitles to scene changes using WWXD.") | |
parser.add_argument("sub_path", type=Path, help="Path to subtitle file.") | |
parser.add_argument("video_path", type=Path, help="Path to video file.") | |
parser.add_argument("--epsilon", type=int, default=7, help="Number of frames to search for scene changes around each frame.") | |
parser.add_argument("-o", "--output", type=Path, help="Output path. By default, the input path with `_snapped` appended.") | |
parser.add_argument("-v", "--verbose", action='store_true', help="Increase verbosity.") | |
params = parser.parse_args() | |
epsilon = params.epsilon | |
# Start by making a resized and wwxd'd clip. | |
# We resize to speed-up the wwxd filter. | |
# The target resultion is kinda arbitrary, | |
# but seems to produce the "best results" | |
# while being reasonably fast. | |
# Note that this would technically work with ffms2 (which is faster), | |
# but ffms2 is known to have issues with frame precise access and indexing of m2ts files, | |
# so lsmas is used instead. | |
# There are some limitations regarding multithreading under windows, | |
# and defining snap_line() as a global function and these as global parameters | |
# seems to be one of the easier workarounds, albeit an ugly one. | |
l.info("Opening video...") | |
clip = core.lsmas.LWLibavSource(str(params.video_path)).resize.Bilinear(640, 360, format=vapoursynth.YUV420P8).wwxd.WWXD() | |
framerate = clip.fps_num / 1001 | |
# itertools recipe | |
def roundrobin(*iterables: Iterable) -> Iterable: | |
"roundrobin('ABC', 'D', 'EF') --> A D E B F C" | |
# Recipe credited to George Sakkis | |
pending = len(iterables) | |
nexts = cycle(iter(it).__next__ for it in iterables) | |
while pending: | |
try: | |
for next_ in nexts: | |
yield next_() | |
except StopIteration: | |
pending -= 1 | |
nexts = cycle(islice(nexts, pending)) | |
def negate(iterable: Iterable) -> Iterable: | |
for i in iterable: | |
yield -i | |
def time_to_frame(time: timedelta, framerate: float, floor: bool = True) -> int: | |
frame = time.total_seconds() * framerate | |
if floor: | |
return math.floor(frame) | |
else: | |
return math.ceil(frame) | |
def frame_to_time(frame: int, framerate: float, floor: bool = True) -> timedelta: | |
if floor: | |
middle_frame = max(0, frame - 0.5) | |
else: | |
middle_frame = frame + 0.5 | |
secs = middle_frame / framerate | |
secs = round(secs, 2) # round to centiseconds because python-ass floors by default | |
return timedelta(seconds=secs) | |
# cache scene-changing frames for speedup (by clip) | |
_scene_change_cache = defaultdict(set) | |
def find_nearest_scenechange(clip, frame: int, epsilon: int, prefer_forward: bool = False) \ | |
-> Optional[int]: | |
# We want an iterator like [-1, 1, -2, 2, -3, 3, -4, 4, -5, 5, -6, 6, -7, 7] | |
base_range = range(1, epsilon + 1) | |
offsets = roundrobin(base_range, negate(base_range)) | |
if not prefer_forward: | |
offsets = negate(offsets) | |
else: | |
offsets = roundrobin(negate(base_range), negate(base_range)) | |
# regarding the +1: WWXD sometimes marks both adjacent frames as a scenechange | |
# in that case, we only want the second frame | |
for offset in offsets: | |
test_frame = frame + offset | |
if test_frame in _scene_change_cache[clip] and test_frame + 1 in _scene_change_cache[clip]: | |
return test_frame + 1 | |
if test_frame in _scene_change_cache[clip]: | |
return test_frame | |
is_scenechange = clip.get_frame(test_frame).props.Scenechange | |
next_is_scenechange = clip.get_frame(test_frame+1).props.Scenechange | |
if is_scenechange and next_is_scenechange: | |
_scene_change_cache[clip].add(test_frame+1) | |
return test_frame+1 | |
if is_scenechange: | |
_scene_change_cache[clip].add(test_frame) | |
return test_frame+1 | |
return None | |
def snap_line(event): | |
if event.TYPE != "Dialogue": | |
return None | |
l.debug("Checking line: %s", event.dump()) | |
# my desperate multithreading attempts broke the logging ¯\_(ツ)_/¯ | |
if params.verbose: | |
print(event.text) | |
# Timestamps (should) always point between two frames. | |
# | |
# Our goal here is to have the first frame *after* the start time | |
# and the first frame *after* the end time to be a scene change. | |
# Thus, we ceil the timestamps when loading | |
# but floor the timestamps of scene changes we found when dumping. | |
start_frame = time_to_frame(event.start, framerate, floor=False) | |
end_frame = time_to_frame(event.end, framerate, floor=False) | |
nearest_start = find_nearest_scenechange(clip, start_frame, epsilon) | |
nearest_end = find_nearest_scenechange(clip, end_frame, epsilon) | |
if nearest_start is not None: | |
event.start = frame_to_time(nearest_start, framerate, floor=True) | |
if params.verbose: | |
print(" Adjusted start by {:d} frames".format(nearest_start - start_frame)) | |
l.debug(" Adjusted start by %+d frames", nearest_start - start_frame) | |
if nearest_end is not None: | |
event.end = frame_to_time(nearest_end, framerate, floor=True) | |
if params.verbose: | |
print(" Adjusted end by {:d} frames".format(nearest_end - end_frame)) | |
l.debug(" Adjusted end by %+d frames", nearest_end - end_frame) | |
l.debug(" New line: %s", event.dump()) | |
return None | |
def snap_keyframes(script, video_path: Path, epsilon: int): | |
l.info("Video frame rate: %.3f", framerate) | |
l.info("Snapping...") | |
# Iterate over dialog lines and adjust timestamps | |
# In my tests, 4 threads were sufficient to saturate most systems, | |
# even if the number of logical threads was significantly higher. | |
# Since the number of threads linearly increases memory consumption, | |
# we do not want to spawn more threads than necessary. | |
pool = Pool(4) | |
pool.map(snap_line, script.events) | |
pool.close() | |
pool.join() | |
l.debug("List of frames with scene changes: %s", | |
list(sorted(_scene_change_cache[clip]))) | |
def main(args: List[str] = None) -> int: | |
# configure logging | |
l.addHandler(logging.StreamHandler()) | |
log_level = logging.DEBUG if params.verbose else logging.INFO | |
l.setLevel(log_level) | |
# verify args | |
if not params.sub_path.suffix == ".ass": | |
l.error("Only works with Advanced Substation Alpha subtitles (.ass)") | |
return 1 | |
if not params.output: | |
params.output = params.sub_path.with_name("{:s}_snapped{:s}".format(params.sub_path.name, params.sub_path.suffix)) | |
# load ass | |
l.info("Parsing subtitles...") | |
with params.sub_path.open(encoding='utf-8') as f: | |
script = ass.parse(f) | |
# do wörk | |
snap_keyframes(script, params.video_path, params.epsilon) | |
# save new file | |
l.info("Writing subtitles...") | |
with params.output.open('w', encoding='utf-8-sig') as f: # I think the BOM is required | |
script.dump_file(f) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment