Last active
December 28, 2024 17:16
-
-
Save Vodes/300997ac1473ac2b9d08d62c169a26de to your computer and use it in GitHub Desktop.
Create Comparisons for multiple video clips with automatic upscaling and uploading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
I do not provide support for this unless its an actual error in the code and not related to your setup. | |
You'll need: | |
- Vapoursynth (this was written & tested on R53 with Python 3.9.6) | |
- pip3 install pathlib anitopy pyperclip requests requests_toolbelt | |
- https://github.com/HolyWu/L-SMASH-Works/releases/latest/ (Install to your usual Vapoursynth plugins64 folder) | |
- (if using ffmpeg) ffmpeg installed & in path | |
How to use: | |
- Drop comp.py into a folder with the video files you want | |
- (Recommended) Rename your files to have the typical [Group] Show - EP.mkv naming, since the script will try to parse the group and show name. | |
e.g. [JPBD] Youjo Senki - 01.m2ts ("fakegroup" for JPBDs so the imagefiles will have JPBD in the name) | |
- Change vars below (trim if needed) | |
- Run comp.py | |
""" | |
### Change these if you need to | |
# Ram Limit (in MB) | |
ram_limit = 6000 | |
# Framecounts | |
frame_count_dark = 8 | |
frame_count_bright = 4 | |
# Automatically upload to slow.pics | |
slowpics = True | |
# Output slow.pics link to discord webhook (disabled if empty) | |
webhook_url = r"" | |
# Upscale videos to make the clips match the highest found res | |
upscale = True | |
# Use ffmpeg as the image renderer (ffmpeg needs to be in path) | |
ffmpeg = True | |
""" | |
Used to trim clips. | |
Example: | |
trim_dict = {0: 1000, 1: 1046} | |
Means: | |
First clip should start at frame 1000 | |
Second clip should start at frame 1046 | |
(Clips are taken in alphabetical order of the filenames) | |
""" | |
trim_dict = {} | |
### Not recommended to change stuff below | |
import os | |
import time | |
import ctypes | |
import random | |
import pathlib | |
import requests | |
import anitopy as ani | |
import pyperclip as pc | |
import vapoursynth as vs | |
from requests import Session | |
from functools import partial | |
from threading import Condition | |
from concurrent.futures import Future | |
from requests_toolbelt import MultipartEncoder | |
from typing import Any, Dict, List, Optional, BinaryIO, TextIO, Union, Callable, Type, TypeVar, Sequence, cast | |
RenderCallback = Callable[[int, vs.VideoFrame], None] | |
VideoProp = Union[int, Sequence[int],float, Sequence[float],str, Sequence[str],vs.VideoNode, Sequence[vs.VideoNode],vs.VideoFrame, Sequence[vs.VideoFrame],Callable[..., Any], Sequence[Callable[..., Any]]] | |
T = TypeVar("T", bound=VideoProp) | |
vs.core.max_cache_size = ram_limit | |
def lazylist(clip: vs.VideoNode, dark_frames: int = 8, light_frames: int = 4, seed: int = 20202020, diff_thr: int = 15): | |
""" | |
Blame Sea for what this shits out | |
A function for generating a list of frames for comparison purposes. | |
Works by running `core.std.PlaneStats()` on the input clip, | |
iterating over all frames, and sorting all frames into 2 lists | |
based on the PlaneStatsAverage value of the frame. | |
Randomly picks frames from both lists, 8 from `dark` and 4 | |
from `light` by default. | |
:param clip: Input clip | |
:param dark_frame: Number of dark frames | |
:param light_frame: Number of light frames | |
:param seed: seed for `random.sample()` | |
:param diff_thr: Minimum distance between each frames (In seconds) | |
:return: List of dark and light frames | |
""" | |
dark = [] | |
light = [] | |
def checkclip(n, f, clip): | |
avg = f.props["PlaneStatsAverage"] | |
if 0.062746 <= avg <= 0.380000: | |
dark.append(n) | |
elif 0.450000 <= avg <= 0.800000: | |
light.append(n) | |
return clip | |
s_clip = clip.std.PlaneStats() | |
eval_frames = vs.core.std.FrameEval( | |
clip, partial(checkclip, clip=s_clip), prop_src=s_clip | |
) | |
print('Rendering clip to get frames...') | |
clip_async_render(eval_frames) | |
dark.sort() | |
light.sort() | |
dark_dedupe = [dark[0]] | |
light_dedupe = [light[0]] | |
thr = round(clip.fps_num / clip.fps_den * diff_thr) | |
lastvald = dark[0] | |
lastvall = light[0] | |
for i in range(1, len(dark)): | |
checklist = dark[0:i] | |
x = dark[i] | |
for y in checklist: | |
if x >= y + thr and x >= lastvald + thr: | |
dark_dedupe.append(x) | |
lastvald = x | |
break | |
for i in range(1, len(light)): | |
checklist = light[0:i] | |
x = light[i] | |
for y in checklist: | |
if x >= y + thr and x >= lastvall + thr: | |
light_dedupe.append(x) | |
lastvall = x | |
break | |
if len(dark_dedupe) > dark_frames: | |
random.seed(seed) | |
dark_dedupe = random.sample(dark_dedupe, dark_frames) | |
if len(light_dedupe) > light_frames: | |
random.seed(seed) | |
light_dedupe = random.sample(light_dedupe, light_frames) | |
return dark_dedupe + light_dedupe | |
def _get_slowpics_header(content_length: str, content_type: str, sess: Session) -> Dict[str, str]: | |
""" | |
Stolen from vardefunc | |
""" | |
return { | |
"Accept": "*/*", | |
"Accept-Encoding": "gzip, deflate, br", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Content-Length": content_length, | |
"Content-Type": content_type, | |
"Origin": "https://slow.pics/", | |
"Referer": "https://slow.pics/comparison", | |
"Sec-Fetch-Mode": "cors", | |
"Sec-Fetch-Site": "same-origin", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"X-XSRF-TOKEN": sess.cookies.get_dict()["XSRF-TOKEN"] | |
} | |
class RenderContext: | |
""" | |
Stolen from lvsfunc | |
Contains info on the current render operation. | |
""" | |
clip: vs.VideoNode | |
queued: int | |
frames: Dict[int, vs.VideoFrame] | |
frames_rendered: int | |
timecodes: List[float] | |
condition: Condition | |
def __init__(self, clip: vs.VideoNode, queued: int) -> None: | |
self.clip = clip | |
self.queued = queued | |
self.frames = {} | |
self.frames_rendered = 0 | |
self.timecodes = [0.0] | |
self.condition = Condition() | |
def get_prop(frame: vs.VideoFrame, key: str, t: Type[T]) -> T: | |
""" | |
Stolen from lvsfunc | |
Gets FrameProp ``prop`` from frame ``frame`` with expected type ``t`` | |
to satisfy the type checker. | |
:param frame: Frame containing props | |
:param key: Prop to get | |
:param t: Type of prop | |
:return: frame.prop[key] | |
""" | |
try: | |
prop = frame.props[key] | |
except KeyError: | |
raise KeyError(f"get_prop: 'Key {key} not present in props'") | |
if not isinstance(prop, t): | |
raise ValueError(f"get_prop: 'Key {key} did not contain expected type: Expected {t} got {type(prop)}'") | |
return prop | |
def clip_async_render(clip: vs.VideoNode, | |
outfile: Optional[BinaryIO] = None, | |
timecodes: Optional[TextIO] = None, | |
progress: Optional[str] = "Rendering clip...", | |
callback: Union[RenderCallback, List[RenderCallback], None] = None) -> List[float]: | |
""" | |
Stolen from lvsfunc | |
Render a clip by requesting frames asynchronously using clip.get_frame_async, | |
providing for callback with frame number and frame object. | |
This is mostly a re-implementation of VideoNode.output, but a little bit slower since it's pure python. | |
You only really need this when you want to render a clip while operating on each frame in order | |
or you want timecodes without using vspipe. | |
:param clip: Clip to render. | |
:param outfile: Y4MPEG render output BinaryIO handle. If None, no Y4M output is performed. | |
Use ``sys.stdout.buffer`` for stdout. (Default: None) | |
:param timecodes: Timecode v2 file TextIO handle. If None, timecodes will not be written. | |
:param progress: String to use for render progress display. | |
If empty or ``None``, no progress display. | |
:param callback: Single or list of callbacks to be preformed. The callbacks are called | |
when each sequential frame is output, not when each frame is done. | |
Must have signature ``Callable[[int, vs.VideoNode], None]`` | |
See :py:func:`lvsfunc.comparison.diff` for a use case (Default: None). | |
:return: List of timecodes from rendered clip. | |
""" | |
cbl = [] if callback is None else callback if isinstance(callback, list) else [callback] | |
ctx = RenderContext(clip, vs.core.num_threads) | |
bad_timecodes: bool = False | |
def cb(f: Future[vs.VideoFrame], n: int) -> None: | |
ctx.frames[n] = f.result() | |
nn = ctx.queued | |
while ctx.frames_rendered in ctx.frames: | |
nonlocal timecodes | |
nonlocal bad_timecodes | |
frame = ctx.frames[ctx.frames_rendered] | |
# if a frame is missing timing info, clear timecodes because they're worthless | |
if ("_DurationNum" not in frame.props or "_DurationDen" not in frame.props) and not bad_timecodes: | |
bad_timecodes = True | |
if timecodes: | |
timecodes.seek(0) | |
timecodes.truncate() | |
timecodes = None | |
ctx.timecodes = [] | |
print("clip_async_render: frame missing duration information, discarding timecodes") | |
elif not bad_timecodes: | |
ctx.timecodes.append(ctx.timecodes[-1] | |
+ get_prop(frame, "_DurationNum", int) | |
/ get_prop(frame, "_DurationDen", int)) | |
finish_frame(outfile, timecodes, ctx) | |
[cb(ctx.frames_rendered, ctx.frames[ctx.frames_rendered]) for cb in cbl] | |
del ctx.frames[ctx.frames_rendered] # tfw no infinite memory | |
ctx.frames_rendered += 1 | |
# enqueue a new frame | |
if nn < clip.num_frames: | |
ctx.queued += 1 | |
cbp = partial(cb, n=nn) | |
clip.get_frame_async(nn).add_done_callback(cbp) # type: ignore | |
ctx.condition.acquire() | |
ctx.condition.notify() | |
ctx.condition.release() | |
if outfile: | |
if clip.format is None: | |
raise ValueError("clip_async_render: 'Cannot render a variable format clip to y4m!'") | |
if clip.format.color_family not in (vs.YUV, vs.GRAY): | |
raise ValueError("clip_async_render: 'Can only render YUV and GRAY clips to y4m!'") | |
if clip.format.color_family == vs.GRAY: | |
y4mformat = "mono" | |
else: | |
ss = (clip.format.subsampling_w, clip.format.subsampling_h) | |
if ss == (1, 1): | |
y4mformat = "420" | |
elif ss == (1, 0): | |
y4mformat = "422" | |
elif ss == (0, 0): | |
y4mformat = "444" | |
elif ss == (2, 2): | |
y4mformat = "410" | |
elif ss == (2, 0): | |
y4mformat = "411" | |
elif ss == (0, 1): | |
y4mformat = "440" | |
else: | |
raise ValueError("clip_async_render: 'What have you done'") | |
y4mformat = f"{y4mformat}p{clip.format.bits_per_sample}" if clip.format.bits_per_sample > 8 else y4mformat | |
header = f"YUV4MPEG2 C{y4mformat} W{clip.width} H{clip.height} " \ | |
f"F{clip.fps.numerator}:{clip.fps.denominator} Ip A0:0\n" | |
outfile.write(header.encode("utf-8")) | |
if timecodes: | |
timecodes.write("# timestamp format v2\n") | |
ctx.condition.acquire() | |
# seed threads | |
try: | |
for n in range(min(clip.num_frames, vs.core.num_threads)): | |
cbp = partial(cb, n=n) # lambda won't bind the int immediately | |
clip.get_frame_async(n).add_done_callback(cbp) # type: ignore | |
while ctx.frames_rendered != clip.num_frames: | |
ctx.condition.wait() | |
finally: | |
return ctx.timecodes # might as well | |
def finish_frame(outfile: Optional[BinaryIO], timecodes: Optional[TextIO], ctx: RenderContext) -> None: | |
""" | |
Stolen from lvsfunc | |
Output a frame. | |
:param outfile: Output IO handle for Y4MPEG | |
:param timecodes: Output IO handle for timecodesv2 | |
:param ctx: Rendering context | |
""" | |
if timecodes: | |
timecodes.write(f"{round(ctx.timecodes[ctx.frames_rendered]*1000):d}\n") | |
if outfile is None: | |
return | |
f: vs.VideoFrame = ctx.frames[ctx.frames_rendered] | |
outfile.write("FRAME\n".encode("utf-8")) | |
for i, p in enumerate(f.planes()): | |
if f.get_stride(i) != p.width * f.format.bytes_per_sample: | |
outfile.write(bytes(p)) # type: ignore | |
else: | |
outfile.write(p) # type: ignore | |
def screengen( | |
clip: vs.VideoNode, | |
folder: str, | |
suffix: str, | |
frame_numbers: List = None, | |
start: int = 1): | |
""" | |
Stoled from Sea | |
Mod of Narkyy's screenshot generator, stolen from awsmfunc. | |
Generates screenshots from a list of frames. | |
Not specifying `frame_numbers` will use `ssfunc.util.lazylist()` to generate a list of frames. | |
:param folder: Name of folder where screenshots are saved. | |
:param suffix: Name prepended to screenshots (usually group name). | |
:param frame_numbers: List of frames. Either a list or an external file. | |
:param start: Frame to start from. | |
:param delim: Delimiter for the external file. | |
> Usage: ScreenGen(src, "Screenshots", "a") | |
ScreenGen(enc, "Screenshots", "b") | |
""" | |
folder_path = "./{name}".format(name=folder) | |
if not os.path.isdir(folder_path): | |
os.mkdir(folder_path) | |
for i, num in enumerate(frame_numbers, start=start): | |
filename = "{path}/{:03d} - {suffix}.png".format( | |
i, path=folder_path, suffix=suffix | |
) | |
matrix = clip.get_frame(0).props._Matrix | |
if matrix == 2: | |
matrix = 1 | |
print(f"Saving Frame {i}/{len(frame_numbers)} from {suffix}", end="\r") | |
vs.core.imwri.Write( | |
clip.resize.Spline36( | |
format=vs.RGB24, matrix_in=matrix, dither_type="error_diffusion" | |
), | |
"PNG", | |
filename, | |
overwrite=True, | |
).get_frame(num) | |
def get_highest_res(files: List[str]) -> int: | |
height = 0 | |
for f in files: | |
video = vs.core.lsmas.LWLibavSource(f) | |
if height < video.height: | |
height = video.height | |
return height | |
def get_frames(clip: vs.VideoNode, frames: List[int]) -> vs.VideoNode: | |
out = clip[frames[0]] | |
for i in frames[1:]: | |
out += clip[i] | |
return out | |
def actual_script(): | |
files = sorted([f for f in os.listdir('.') if f.endswith('.mkv') or f.endswith('.m2ts') or f.endswith('.mp4')]) | |
if len(files) < 2: | |
print("Not enough video files found.") | |
time.sleep(3) | |
exit | |
print('Files found: ') | |
for f in files: | |
if trim_dict.get(files.index(f)) is not None: | |
print(f + f" (Will be trimmed to start at frame {trim_dict.get(files.index(f))})") | |
else: | |
print(f) | |
print('\n') | |
dict = ani.parse(files[0]) | |
collection_name = dict.get('anime_title') if dict.get('anime_title') is not None else dict.get('episode_title') | |
first = vs.core.lsmas.LWLibavSource(files[0]) | |
if trim_dict.get(0) is not None: | |
first = first[trim_dict.get(0):] | |
frames = lazylist(first, frame_count_dark, frame_count_bright) | |
print(frames) | |
print("\n") | |
if upscale: | |
max_height = get_highest_res(files) | |
for file in files: | |
dict = ani.parse(file) | |
suffix = "" | |
if dict.get('release_group') is not None: | |
suffix = str(dict.get('release_group')).replace("[\\/:\"*?<>|]+", "") | |
if not suffix: | |
suffix = file.replace("[\\/:\"*?<>|]+", "") | |
print(f"Generating screens for {file}") | |
clip = vs.core.lsmas.LWLibavSource(file) | |
index = files.index(file) | |
if trim_dict.get(index) is not None: | |
clip = clip[trim_dict.get(index):] | |
if upscale and clip.height < max_height: | |
clip = vs.core.resize.Spline36(clip, clip.width * (max_height / clip.height), max_height) | |
if ffmpeg: | |
import subprocess | |
matrix = clip.get_frame(0).props._Matrix | |
if matrix == 2: | |
matrix = 1 | |
clip = clip.resize.Bicubic(format=vs.RGB24, matrix_in=matrix, dither_type="error_diffusion") | |
clip = get_frames(clip, frames) | |
clip = clip.std.ShufflePlanes([1, 2, 0], vs.RGB).std.AssumeFPS(fpsnum=1, fpsden=1) | |
if not os.path.isdir("./screens"): | |
os.mkdir("./screens") | |
path_images = [ | |
"{path}/{:03d} - {suffix}.png".format(frames.index(f) + 1, path="./screens", suffix=suffix) | |
for f in frames | |
] | |
print(path_images) | |
for i, path_image in enumerate(path_images): | |
ffmpeg_line = f"ffmpeg -y -hide_banner -loglevel error -f rawvideo -video_size {clip.width}x{clip.height} -pixel_format gbrp -framerate {str(clip.fps)} -i pipe: -pred mixed -ss {i} -t 1 \"{path_image}\"" | |
try: | |
with subprocess.Popen(ffmpeg_line, stdin=subprocess.PIPE) as process: | |
clip.output(cast(BinaryIO, process.stdin), y4m=False) | |
except: | |
None | |
else: | |
screengen(clip, "screens", suffix, frames) | |
print("\n") | |
if slowpics: | |
time.sleep(3) | |
fields: Dict[str, Any] = { | |
'collectionName': collection_name, | |
'public': 'false', | |
'optimize-images': 'true' | |
} | |
all_image_files = sorted([f for f in os.listdir('./screens/') if f.endswith('.png')]) | |
# Screen Number not dynamic yet so dont care about this being hardcoded | |
for x in range(frame_count_dark + frame_count_bright): | |
formatted_num = "{:03d}".format(x + 1) | |
current_comp = [f for f in all_image_files if f.startswith(formatted_num + " - ")] | |
print(f"Comp {formatted_num}: {current_comp}") | |
fields[f'comparisons[{x}].name'] = formatted_num | |
for imageName in current_comp: | |
i = current_comp.index(imageName) | |
image = pathlib.Path(f'./screens/{imageName}') | |
fields[f'comparisons[{x}].images[{i}].name'] = image.name.split(' - ', 1)[1] | |
fields[f'comparisons[{x}].images[{i}].file'] = (image.name, image.read_bytes(), 'image/png') | |
sess = Session() | |
sess.get('https://slow.pics/api/comparison') | |
# TODO: yeet this | |
files = MultipartEncoder(fields) | |
print('\nUploading images...') | |
url = sess.post( | |
'https://slow.pics/api/comparison', data=files.to_string(), | |
headers=_get_slowpics_header(str(files.len), files.content_type, sess) | |
) | |
sess.close() | |
slowpics_url = f'https://slow.pics/c/{url.text}' | |
print(f'Slowpics url: {slowpics_url}') | |
pc.copy(slowpics_url) | |
if webhook_url: | |
data = {"content": slowpics_url} | |
if requests.post(webhook_url, data).status_code < 300: | |
print('Posted to webhook.') | |
else: | |
print('Failed to post on webhook!') | |
time.sleep(3) | |
actual_script() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment