|
# enhance_video.py — adds --stub-ui and robust Waifu2x error handling |
|
from __future__ import annotations |
|
import shlex, shutil, subprocess, sys |
|
from pathlib import Path |
|
from typing import List, Optional |
|
|
|
import typer |
|
from loguru import logger |
|
from pydantic import BaseModel, Field, field_validator |
|
from rich.console import Console |
|
from rich.progress import Progress, BarColumn, TimeElapsedColumn, TimeRemainingColumn |
|
|
|
console = Console() |
|
app = typer.Typer(no_args_is_help=True) |
|
CRF = 18 |
|
|
|
# ───────── MODELS ───────── |
|
class WaifuCfg(BaseModel): |
|
alias: str = "cunet_anime" |
|
custom_id: Optional[str] = None |
|
noise: int = Field(0, ge=0, le=3) |
|
batch: int = 10 |
|
|
|
def flags(self, w: int, h: int, scale: int) -> list[str]: |
|
f = [ |
|
"waifu2x", |
|
"--stub-ui", # ← crash-workaround |
|
"--raw", |
|
"-m", |
|
self.alias, |
|
"-s", |
|
str(scale), |
|
"--width", |
|
str(w), |
|
"--height", |
|
str(h), |
|
"--cml-batch-size", |
|
str(self.batch), |
|
"--force-gpu", |
|
] |
|
if self.custom_id: |
|
f += ["--model-id", self.custom_id] |
|
if self.noise: |
|
f += ["-n", str(self.noise)] |
|
return f |
|
|
|
|
|
class Job(BaseModel): |
|
src: Path |
|
dst: Path |
|
k: int |
|
fps: int |
|
waifu: WaifuCfg |
|
encoder: str = "libx264" |
|
rife: Optional[Path] = None |
|
scale_chain: List[int] = [] |
|
interp: int = 2 |
|
|
|
@field_validator("src") |
|
@classmethod |
|
def exists(cls, v: Path): |
|
if not v.exists(): |
|
raise ValueError(f"{v} not found") |
|
return v |
|
|
|
|
|
# ───────── helpers ───────── |
|
def c(cmd): logger.debug(" ".join(map(shlex.quote, cmd))) |
|
def pipe(cmd, stdin=None): |
|
c(cmd) |
|
return subprocess.Popen(cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
def probe(p: Path): |
|
w, h, r = subprocess.check_output( |
|
["ffprobe","-v","error","-select_streams","v:0", |
|
"-show_entries","stream=width,height,avg_frame_rate","-of","csv=p=0",str(p)] |
|
).decode().strip().split(",") |
|
n,d = map(int,r.split("/")); return int(w),int(h),(n/d if d else 30) |
|
|
|
def chain(k:int,h:int): |
|
tgt={1:720,2:1080,4:2160,8:4320,12:6480}[k]; cur=1; ch=[] |
|
while h*cur<tgt and cur<8: ch.append(2); cur*=2 |
|
if h*cur<tgt: ch.append(2) |
|
return ch or [2] |
|
|
|
# ───────── core ───────── |
|
def run(job: Job): |
|
w,h,src_fps = probe(job.src) |
|
job.scale_chain = chain(job.k,h) |
|
job.interp = max(2, round(job.fps/src_fps)) |
|
console.rule("[cyan]Enhance"); console.print(f"{w}×{h}@{src_fps:.1f} → k{job.k} {job.fps}fps") |
|
|
|
with Progress("{task.description}",BarColumn(),TimeElapsedColumn(),TimeRemainingColumn(),console=console) as prog: |
|
tk = prog.add_task("waifu2x", total=len(job.scale_chain)) |
|
reader = pipe(["ffmpeg","-v","error","-i",str(job.src),"-f","rawvideo","-pix_fmt","rgba","-"]) |
|
stream = reader.stdout; cur_w,cur_h = w,h |
|
|
|
for s in job.scale_chain: |
|
waifu = pipe(job.waifu.flags(cur_w,cur_h,s), stream) |
|
waifu_stdout, waifu_stderr = waifu.communicate() |
|
if waifu.returncode != 0: |
|
console.print("[red]Waifu2x failed:[/]\n" + waifu_stderr.decode()) |
|
sys.exit(1) |
|
stream = subprocess.PIPE |
|
stream = waifu_stdout = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
|
stream.stdin.write(waifu_stdout) |
|
stream.stdin.close() |
|
cur_w*=s; cur_h*=s; prog.advance(tk) |
|
|
|
if job.rife and job.interp>1: |
|
stream=pipe([str(job.rife),"-i","-","-o","-","-f",str(job.interp),"-g","-1"], stream.stdout).stdout |
|
elif job.interp>1: |
|
stream=pipe( |
|
["ffmpeg","-v","error","-f","rawvideo","-pix_fmt","rgba", |
|
"-s",f"{cur_w}x{cur_h}","-r",f"{src_fps}","-i","-", |
|
"-vf",f"minterpolate=fps={job.fps}:mi_mode=mci", |
|
"-f","rawvideo","-pix_fmt","rgba","-"], stream.stdout).stdout |
|
|
|
enc=["ffmpeg","-v","error","-y","-f","rawvideo","-pix_fmt","rgba", |
|
"-s",f"{cur_w}x{cur_h}","-r",str(job.fps),"-i","-", |
|
"-c:v",job.encoder,"-pix_fmt","yuv420p","-crf",str(CRF),str(job.dst)] |
|
c(enc); subprocess.run(enc, check=True, stdin=stream) |
|
|
|
console.print(f"[green]✔ saved {job.dst}") |
|
|
|
# ───────── CLI ───────── |
|
@app.command() |
|
def clip( |
|
src: Path = typer.Argument(..., exists=True), |
|
k: int = typer.Option(4,"-k"), |
|
fps: int = typer.Option(60,"-f"), |
|
model: str = typer.Option("cunet_anime","-m"), |
|
model_id: Optional[str] = typer.Option(None,"-M"), |
|
noise: int = typer.Option(0,"-d",min=0,max=3), |
|
encoder: str = typer.Option("libx264","-e"), |
|
out: Optional[Path] = typer.Option(None,"-o"), |
|
): |
|
rife_bin = shutil.which("rife-ncnn-vulkan") |
|
job = Job( |
|
src=src, |
|
dst=out or src.with_name(f"{src.stem}_{k}K{fps}{src.suffix}"), |
|
k=k, fps=fps, encoder=encoder, |
|
waifu=WaifuCfg(alias=model, custom_id=model_id, noise=noise), |
|
rife=Path(rife_bin) if rife_bin else None |
|
) |
|
run(job) |
|
|
|
if __name__=="__main__": |
|
app() |