Skip to content

Instantly share code, notes, and snippets.

@pszemraj
Created November 6, 2025 01:18
Show Gist options
  • Save pszemraj/56b2efdfc8a2ae6be9e0dc2edef27e79 to your computer and use it in GitHub Desktop.
Save pszemraj/56b2efdfc8a2ae6be9e0dc2edef27e79 to your computer and use it in GitHub Desktop.
GVE + SimSIMD video deduplication CLI via https://gzn00417.github.io/GVE/
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Apache-2.0
GVE + SimSIMD video deduplication CLI via https://gzn00417.github.io/GVE/
Design highlights
- Embeddings: GVE (Qwen2.5-VL based) last-token pooled + ℓ2-normalized (bf16/float16), per paper/model card.
- Test-time policy: 8 frames baseline, scale with duration (16/32/48) for long videos; ~200 visual tokens per frame.
- Similarity: cosine via SimSIMD on CPU (distances -> similarities), no heavyweight ANN index.
- Scope: Optimized for ≤ ~1k videos (global), and segment-level partial duplicates at small/medium scale (streaming top-K).
- I/O: Pathlib everywhere; embeddings persisted as .npy (float16) + JSONL metadata.
"""
from __future__ import annotations
import csv
import hashlib
import json
import math
import os
import time
import warnings
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import click
import numpy as np
import simsimd
import torch
import torch.nn.functional as F
from transformers import AutoModel, AutoProcessor
# TODO: remove/handle LLM nanny code below
# ---- Optional video backends
try:
import decord # type: ignore
decord.bridge.set_bridge("torch")
_DECORD_OK = True
except Exception:
_DECORD_OK = False
try:
import cv2 # type: ignore
_CV2_OK = True
except Exception:
_CV2_OK = False
# transformers>=4.44 sometimes exposes this helper; optional
try:
from transformers import process_vision_info # type: ignore
_HAS_PVI = True
except Exception:
process_vision_info = None
_HAS_PVI = False
# ------------------------------
# Constants & utilities
# ------------------------------
VIDEO_EXTS = {".mp4", ".mov", ".mkv", ".avi", ".webm", ".m4v"}
@dataclass
class VideoInfo:
path: str
size_bytes: int
mtime: float
width: Optional[int]
height: Optional[int]
fps: Optional[float]
duration_s: Optional[float]
n_frames: Optional[int]
@dataclass
class EmbRow:
id: int
kind: str # "global" | "segment"
video_path: str
video_id: str
start_s: Optional[float]
end_s: Optional[float]
policy: str
dims: int
class DSU:
def __init__(self, n: int) -> None:
self.p = list(range(n))
self.r = [0] * n
def find(self, x: int) -> int:
while self.p[x] != x:
self.p[x] = self.p[self.p[x]]
x = self.p[x]
return x
def union(self, a: int, b: int) -> None:
ra, rb = self.find(a), self.find(b)
if ra == rb:
return
if self.r[ra] < self.r[rb]:
self.p[ra] = rb
elif self.r[ra] > self.r[rb]:
self.p[rb] = ra
else:
self.p[rb] = ra
self.r[ra] += 1
def sha1_of_string(s: str) -> str:
return hashlib.sha1(s.encode("utf-8")).hexdigest()
def stable_video_id(p: Path) -> str:
st = p.stat()
payload = f"{p.resolve()}::{st.st_size}::{int(st.st_mtime)}"
return sha1_of_string(payload)
def is_video_file(p: Path) -> bool:
return p.suffix.lower() in VIDEO_EXTS
def ensure_outdir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def walk_videos(root: Path, glob: str) -> List[Path]:
return [p for p in root.rglob(glob) if p.is_file() and is_video_file(p)]
# ------------------------------
# Video probing & sampling
# ------------------------------
def probe_video(p: Path) -> VideoInfo:
width = height = None
fps = None
n_frames = None
duration_s = None
if _DECORD_OK:
try:
vr = decord.VideoReader(str(p))
h, w = vr[0].shape[0], vr[0].shape[1]
width, height = int(w), int(h)
n_frames = int(len(vr))
fps = float(vr.get_avg_fps()) if hasattr(vr, "get_avg_fps") else None
if fps and n_frames:
duration_s = n_frames / fps
except Exception:
pass
if duration_s is None and _CV2_OK:
try:
cap = cv2.VideoCapture(str(p))
if cap.isOpened():
n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or None
fps_val = cap.get(cv2.CAP_PROP_FPS)
fps = float(fps_val) if fps_val and fps_val > 1e-6 else None
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or None
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or None
if fps and n_frames:
duration_s = n_frames / fps
cap.release()
except Exception:
pass
st = p.stat()
return VideoInfo(
path=str(p),
size_bytes=st.st_size,
mtime=st.st_mtime,
width=width,
height=height,
fps=fps,
duration_s=duration_s,
n_frames=n_frames,
)
def auto_frames_for_duration(duration_s: Optional[float]) -> int:
"""
Paper-informed "auto" test-time scaling:
<=5 min: 8; 5–30: 16; 30–60: 32; >60: 48.
GVE shows gains for long-context from more frames; keep ~200 tokens/frame. :contentReference[oaicite:1]{index=1}
"""
if duration_s is None:
return 8
m = duration_s / 60.0
if m <= 5:
return 8
if m <= 30:
return 16
if m <= 60:
return 32
return 48
def sample_frame_indices(n_total: int, n_wanted: int) -> List[int]:
if n_total <= 0 or n_wanted <= 0:
return []
n = min(n_total, n_wanted)
if n == n_total:
return list(range(n_total))
return [int(i * (n_total - 1) / (n - 1)) for i in range(n)]
def read_frames_as_numpy(
p: Path, n_frames: int, start_frac: float = 0.0, end_frac: float = 1.0
) -> List[np.ndarray]:
frames: List[np.ndarray] = []
if _DECORD_OK:
try:
vr = decord.VideoReader(str(p))
total = len(vr)
s_idx = int(total * start_frac)
e_idx = max(s_idx + 1, int(total * end_frac))
idxs = sample_frame_indices(e_idx - s_idx, n_frames)
idxs = [s_idx + k for k in idxs]
batch = vr.get_batch(idxs).asnumpy() # (N, H, W, C), RGB, uint8
frames = [batch[i] for i in range(batch.shape[0])]
return frames
except Exception:
pass
if _CV2_OK:
try:
cap = cv2.VideoCapture(str(p))
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total and total > 0:
s_idx = int(total * start_frac)
e_idx = max(s_idx + 1, int(total * end_frac))
idxs = sample_frame_indices(e_idx - s_idx, n_frames)
idxs = [s_idx + k for k in idxs]
for idx in idxs:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ok, bgr = cap.read()
if not ok or bgr is None:
continue
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
frames.append(rgb)
cap.release()
except Exception:
pass
return frames
# ------------------------------
# GVE embedder
# ------------------------------
class GVEEmbedder:
"""
Minimal wrapper around GVE to produce normalized embeddings.
Last-token pooling + L2 normalization, per architecture notes. :contentReference[oaicite:2]{index=2}
"""
def __init__(
self,
model_name: str = "Alibaba-NLP/GVE-3B",
device: Optional[str] = None,
dtype: str = "bfloat16",
max_pixels: int = 200
* 28
* 28, # ~200 tokens/frame; avoid >400 where perf can drop. :contentReference[oaicite:3]{index=3}
max_length: int = 1200,
) -> None:
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
if dtype == "bfloat16":
torch_dtype = torch.bfloat16
elif dtype == "float16":
torch_dtype = torch.float16
else:
torch_dtype = torch.float32
self.model = AutoModel.from_pretrained(
model_name,
trust_remote_code=True,
device_map="auto" if self.device == "cuda" else None,
low_cpu_mem_usage=True,
torch_dtype=torch_dtype,
)
self.processor = AutoProcessor.from_pretrained(
model_name,
trust_remote_code=True,
use_fast=True,
)
if hasattr(self.processor, "tokenizer"):
self.processor.tokenizer.padding_side = "left"
self.max_pixels = max_pixels
self.max_length = max_length
@torch.inference_mode()
def embed_video_path(
self,
video_path: Path,
n_frames: int,
fps: Optional[float] = None,
prompt: str = "Represent this video for retrieval/deduplication.",
out_dtype: torch.dtype = torch.float16,
) -> np.ndarray:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": [
{
"type": "video",
"video": str(video_path),
"max_pixels": self.max_pixels,
"fps": fps if fps and fps > 0 else 1.0,
"max_frames": int(n_frames),
},
{"type": "text", "text": prompt},
],
},
]
texts = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs = None
video_inputs = [str(video_path)]
video_kwargs = {
"max_pixels": self.max_pixels,
"fps": fps if fps and fps > 0 else 1.0,
"max_frames": int(n_frames),
}
if _HAS_PVI and process_vision_info is not None:
try:
image_inputs, video_inputs, video_kwargs = process_vision_info(
messages, return_video_kwargs=True
)
except Exception:
pass
inputs = self.processor(
text=[texts],
images=image_inputs,
videos=video_inputs,
padding=True,
truncation=True,
max_length=self.max_length,
return_tensors="pt",
**video_kwargs,
).to(self.device)
outputs = self.model(**inputs)
vec = outputs["last_hidden_state"][:, -1, :]
vec = F.normalize(vec, p=2, dim=1)
# Move to CPU, cast to out_dtype (float16 default), return as numpy
return vec.squeeze(0).to("cpu", dtype=out_dtype).numpy()
@torch.inference_mode()
def embed_frame_bundle(
self,
frames: List[np.ndarray],
prompt: str = "Represent this video segment for retrieval/deduplication.",
out_dtype: torch.dtype = torch.float16,
) -> np.ndarray:
content = [{"type": "image", "image": fr} for fr in frames]
content.append({"type": "text", "text": prompt})
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": content},
]
texts = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs = None
if _HAS_PVI and process_vision_info is not None:
try:
image_inputs, _, _ = process_vision_info(
messages, return_video_kwargs=True
)
except Exception:
image_inputs = [fr for fr in frames]
else:
image_inputs = [fr for fr in frames]
inputs = self.processor(
text=[texts],
images=image_inputs,
padding=True,
truncation=True,
max_length=self.max_length,
return_tensors="pt",
).to(self.device)
outputs = self.model(**inputs)
vec = outputs["last_hidden_state"][:, -1, :]
vec = F.normalize(vec, p=2, dim=1)
return vec.squeeze(0).to("cpu", dtype=out_dtype).numpy()
# ------------------------------
# Persistence helpers
# ------------------------------
def save_matrix_and_meta(
outdir: Path, name: str, vectors: np.ndarray, rows: List[EmbRow]
) -> None:
ensure_outdir(outdir)
np.save(outdir / f"{name}_embeddings.npy", vectors.astype(np.float16))
with (outdir / f"{name}_meta.jsonl").open("w", encoding="utf-8") as f:
for r in rows:
f.write(json.dumps(asdict(r), ensure_ascii=False) + "\n")
def load_matrix_and_meta(outdir: Path, name: str) -> Tuple[np.ndarray, List[EmbRow]]:
vecs = np.load(outdir / f"{name}_embeddings.npy")
rows: List[EmbRow] = []
with (outdir / f"{name}_meta.jsonl").open("r", encoding="utf-8") as f:
for line in f:
rows.append(EmbRow(**json.loads(line)))
return vecs, rows
# ------------------------------
# SimSIMD-based search
# ------------------------------
def _dtype_string_for_simsimd(arr: np.ndarray) -> str:
if arr.dtype == np.float16:
return "float16"
if (
arr.dtype == np.bfloat16
): # numpy doesn't have bfloat16 widely; kept for completeness
return "bfloat16"
if arr.dtype == np.float32:
return "float32"
return "float32"
def simsimd_full_cosine_topk(
X: np.ndarray, topk: int, threads: int = 0
) -> Tuple[np.ndarray, np.ndarray]:
"""
Full NxN cosine distances via SimSIMD, then convert to similarities and take topk per row.
For N <= ~5k this is fine; here optimized for ≤ ~1k videos.
"""
dtype_str = _dtype_string_for_simsimd(X)
# distances in [0,2], cosine distance = 1 - cosine_similarity
D = simsimd.cdist(X, X, metric="cosine", threads=threads, dtype=dtype_str)
D = np.array(D, copy=True) # materialize DistancesTensor
# self distances -> +inf to exclude
n = D.shape[0]
np.fill_diagonal(D, np.inf)
S = 1.0 - D # similarity in [-1,1]
if topk >= n:
idx = np.argsort(-S, axis=1)
scr = np.take_along_axis(S, idx, axis=1)
return idx, scr
idx_part = np.argpartition(-S, kth=range(topk), axis=1)[:, :topk]
row = np.arange(n)[:, None]
scr_part = S[row, idx_part]
order = np.argsort(-scr_part, axis=1)
idx_sorted = idx_part[row, order]
scr_sorted = scr_part[row, order]
return idx_sorted, scr_sorted
def simsimd_streaming_cosine_topk(
X: np.ndarray,
topk: int,
threads: int = 0,
block_rows: int = 1024,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Stream rows in blocks against full X to get topk neighbors.
Useful for larger segment sets where full NxN wouldn't fit comfortably.
"""
dtype_str = _dtype_string_for_simsimd(X)
N = X.shape[0]
idx_all = np.empty((N, topk), dtype=np.int32)
scr_all = np.empty((N, topk), dtype=np.float32)
for start in range(0, N, block_rows):
end = min(N, start + block_rows)
D = simsimd.cdist(
X[start:end], X, metric="cosine", threads=threads, dtype=dtype_str
)
D = np.array(D, copy=True)
# set self-distances to +inf for rows in this block
for i in range(start, end):
D[i - start, i] = np.inf
S = 1.0 - D
if topk >= N:
idx_block = np.argsort(-S, axis=1)
scr_block = np.take_along_axis(S, idx_block, axis=1)
else:
idx_part = np.argpartition(-S, kth=range(topk), axis=1)[:, :topk]
row = np.arange(S.shape[0])[:, None]
scr_part = S[row, idx_part]
order = np.argsort(-scr_part, axis=1)
idx_block = idx_part[row, order]
scr_block = scr_part[row, order]
idx_all[start:end] = idx_block
scr_all[start:end] = scr_block
return idx_all, scr_all
def build_edges_from_topk(
idx: np.ndarray, scr: np.ndarray, threshold: float
) -> List[Tuple[int, int, float]]:
"""
Form undirected edges (i<j) for pairs with similarity >= threshold.
"""
N, K = idx.shape
edges: Dict[Tuple[int, int], float] = {}
for i in range(N):
for jpos in range(K):
j = int(idx[i, jpos])
if j < 0 or j == i:
continue
s = float(scr[i, jpos])
if s >= threshold:
a, b = (i, j) if i < j else (j, i)
if a == b:
continue
prev = edges.get((a, b), -1.0)
if s > prev:
edges[(a, b)] = s
return [(a, b, s) for (a, b), s in edges.items()]
# ------------------------------
# CLI
# ------------------------------
@click.group(context_settings=dict(help_option_names=["-h", "--help"]))
def cli() -> None:
"""GVE + SimSIMD video deduplication toolkit."""
@cli.command()
@click.argument("input_dir", type=click.Path(path_type=Path, exists=True))
@click.argument("index_dir", type=click.Path(path_type=Path))
@click.option("--model", default="Alibaba-NLP/GVE-3B", show_default=True)
@click.option("--device", default=None, help="torch device; default auto.")
@click.option(
"--dtype",
default="bfloat16",
type=click.Choice(["bfloat16", "float16", "float32"]),
show_default=True,
)
@click.option(
"--max-pixels",
default=200 * 28 * 28,
show_default=True,
type=int,
help="~visual tokens per frame.",
)
@click.option(
"--global-frames", default="auto", show_default=True, help='"auto" or integer.'
)
@click.option(
"--segments/--no-segments",
default=True,
show_default=True,
help="Compute segment-level embeddings.",
)
@click.option("--segment-seconds", default=60.0, show_default=True)
@click.option("--segment-stride-seconds", default=30.0, show_default=True)
@click.option("--frames-per-segment", default=8, show_default=True)
@click.option(
"--glob", default="**/*", show_default=True, help="Glob pattern within input_dir."
)
def index(
input_dir: Path,
index_dir: Path,
model: str,
device: Optional[str],
dtype: str,
max_pixels: int,
global_frames: str,
segments: bool,
segment_seconds: float,
segment_stride_seconds: float,
frames_per_segment: int,
glob: str,
) -> None:
"""
Build embeddings (global + optional segments) and save under INDEX_DIR.
"""
t0 = time.time()
embedder = GVEEmbedder(
model_name=model, device=device, dtype=dtype, max_pixels=max_pixels
)
videos = walk_videos(input_dir, glob)
videos.sort()
click.echo(f"Found {len(videos)} videos. Indexing...")
g_vecs: List[np.ndarray] = []
g_rows: List[EmbRow] = []
s_vecs: List[np.ndarray] = []
s_rows: List[EmbRow] = []
gid = 0
sid = 0
for vp in videos:
info = probe_video(vp)
vid = stable_video_id(vp)
n_frames_glob = (
auto_frames_for_duration(info.duration_s)
if global_frames == "auto"
else max(1, int(global_frames))
)
# Global embedding
try:
g = embedder.embed_video_path(
vp, n_frames=n_frames_glob, fps=info.fps or 1.0, out_dtype=torch.float16
)
g_vecs.append(g)
g_rows.append(
EmbRow(
id=gid,
kind="global",
video_path=str(vp.resolve()),
video_id=vid,
start_s=None,
end_s=None,
policy=f"global-{n_frames_glob}",
dims=int(g.shape[0]),
)
)
gid += 1
except Exception as e:
click.echo(f"[warn] Global embedding failed for {vp}: {e}", err=True)
continue
# Segment embeddings
if (
segments
and info.duration_s
and info.duration_s >= max(segment_seconds, segment_stride_seconds)
):
total = info.duration_s
start = 0.0
while start < total:
end = min(start + segment_seconds, total)
s_frac = max(0.0, start / total)
e_frac = min(1.0, end / total)
frames = read_frames_as_numpy(
vp, n_frames=frames_per_segment, start_frac=s_frac, end_frac=e_frac
)
if len(frames) >= max(2, frames_per_segment // 2):
try:
s = embedder.embed_frame_bundle(frames, out_dtype=torch.float16)
s_vecs.append(s)
s_rows.append(
EmbRow(
id=sid,
kind="segment",
video_path=str(vp.resolve()),
video_id=vid,
start_s=float(start),
end_s=float(end),
policy=f"segments@{int(segment_seconds)}s_stride={int(segment_stride_seconds)}s_frames={frames_per_segment}",
dims=int(s.shape[0]),
)
)
sid += 1
except Exception as e:
click.echo(
f"[warn] Segment embedding failed for {vp} [{start:.1f}-{end:.1f}s]: {e}",
err=True,
)
start += segment_stride_seconds
outdir = Path(index_dir)
ensure_outdir(outdir)
if g_vecs:
G = np.stack(g_vecs, axis=0) # float16 normalized
save_matrix_and_meta(outdir, "global", G, g_rows)
click.echo(f"Saved global embeddings: {G.shape} -> {outdir}")
if s_vecs:
S = np.stack(s_vecs, axis=0)
save_matrix_and_meta(outdir, "segment", S, s_rows)
click.echo(f"Saved segment embeddings: {S.shape} -> {outdir}")
click.echo(f"Indexing done in {time.time() - t0:.1f}s")
@cli.command()
@click.argument("index_dir", type=click.Path(path_type=Path, exists=True))
@click.argument("report_csv", type=click.Path(path_type=Path))
@click.option(
"--global-th",
default=0.92,
show_default=True,
help="Cosine similarity threshold (global).",
)
@click.option(
"--segment-th",
default=0.88,
show_default=True,
help="Cosine similarity threshold (segments).",
)
@click.option(
"--segment-min-matches",
default=3,
show_default=True,
help="Count threshold to merge via segments.",
)
@click.option(
"--segment-min-fraction",
default=0.35,
show_default=True,
help="Fraction threshold to merge via segments.",
)
@click.option(
"--topk", default=10, show_default=True, help="Neighbors to consider per item."
)
@click.option(
"--segment-topk", default=10, show_default=True, help="Neighbors per segment item."
)
@click.option(
"--threads", default=0, show_default=True, help="SimSIMD threads (0 = all cores)."
)
def dedupe(
index_dir: Path,
report_csv: Path,
global_th: float,
segment_th: float,
segment_min_matches: int,
segment_min_fraction: float,
topk: int,
segment_topk: int,
threads: int,
) -> None:
"""
Build duplicate clusters from stored embeddings; write a reviewable CSV report.
"""
# Load global
G, grows = load_matrix_and_meta(index_dir, "global")
# Ensure normalized (should be already)
G = G.astype(np.float16)
# Top-k with full NxN (≤ ~1k is fine)
g_idx, g_scr = simsimd_full_cosine_topk(G, topk=topk, threads=threads)
g_edges = build_edges_from_topk(g_idx, g_scr, threshold=global_th)
dsu = DSU(len(grows))
for a, b, s in g_edges:
dsu.union(a, b)
# Optional: segment-level partial matches
seg_pairs: Dict[Tuple[str, str], int] = {}
seg_total: Dict[str, int] = {}
if (index_dir / "segment_embeddings.npy").exists() and (
index_dir / "segment_meta.jsonl"
).exists():
S, srows = load_matrix_and_meta(index_dir, "segment")
S = S.astype(np.float16)
s_idx, s_scr = simsimd_streaming_cosine_topk(
S, topk=segment_topk, threads=threads, block_rows=1024
)
# accumulate cross-video matches
for i in range(S.shape[0]):
ri = srows[i]
seg_total[ri.video_id] = seg_total.get(ri.video_id, 0) + 1
for jpos in range(s_idx.shape[1]):
j = int(s_idx[i, jpos])
if j < 0:
continue
rj = srows[j]
if ri.video_id == rj.video_id:
continue
if float(s_scr[i, jpos]) < segment_th:
continue
a, b = sorted([ri.video_id, rj.video_id])
seg_pairs[(a, b)] = seg_pairs.get((a, b), 0) + 1
# Promote strong partial-dup pairs to clusters
for (a, b), cnt in seg_pairs.items():
fa = cnt / max(1, seg_total.get(a, 1))
fb = cnt / max(1, seg_total.get(b, 1))
if cnt >= segment_min_matches or max(fa, fb) >= segment_min_fraction:
ia = next((i for i, r in enumerate(grows) if r.video_id == a), None)
ib = next((i for i, r in enumerate(grows) if r.video_id == b), None)
if ia is not None and ib is not None:
dsu.union(ia, ib)
# Collect final groups
groups: Dict[int, List[int]] = {}
for i in range(len(grows)):
root = dsu.find(i)
groups.setdefault(root, []).append(i)
# Keeper heuristic: prefer longer duration -> higher resolution -> larger file
info_cache: Dict[str, VideoInfo] = {}
def info_of(path_str: str) -> VideoInfo:
if path_str not in info_cache:
info_cache[path_str] = probe_video(Path(path_str))
return info_cache[path_str]
def keeper_key(i: int) -> Tuple[float, int, int]:
r = grows[i]
inf = info_of(r.video_path)
dur = inf.duration_s or 0.0
px = (inf.width or 0) * (inf.height or 0)
size = inf.size_bytes
return (dur, px, size)
with report_csv.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(
[
"group_id",
"is_keeper",
"reason",
"video_path",
"video_id",
"duration_s",
"resolution",
"size_bytes",
]
)
for gid, members in enumerate(sorted(groups.values(), key=lambda xs: -len(xs))):
best = max(members, key=keeper_key)
reason = "global" if not seg_pairs else "global|partial"
for m in members:
r = grows[m]
inf = info_of(r.video_path)
w.writerow(
[
gid,
"yes" if m == best else "no",
reason,
r.video_path,
r.video_id,
f"{(inf.duration_s or 0.0):.2f}",
f"{inf.width or 0}x{inf.height or 0}",
inf.size_bytes,
]
)
click.echo(f"Wrote report to {report_csv}")
@cli.command()
@click.argument("report_csv", type=click.Path(path_type=Path, exists=True))
@click.option(
"--emit-script",
type=click.Path(path_type=Path),
help="Write a shell script with `mv -n` for non-keepers.",
)
@click.option(
"--review-dir",
type=click.Path(path_type=Path),
help="Destination folder for non-keepers.",
)
def plan_actions(
report_csv: Path, emit_script: Optional[Path], review_dir: Optional[Path]
) -> None:
"""
Turn the report into a reviewable action plan (no deletes).
"""
rows = []
with report_csv.open("r", encoding="utf-8") as f:
for r in csv.DictReader(f):
rows.append(r)
if not rows:
click.echo("Nothing to do.")
return
if emit_script and review_dir:
ensure_outdir(review_dir)
lines = ["#!/usr/bin/env bash", "set -euo pipefail"]
for r in rows:
if r["is_keeper"].lower() != "yes":
src = r["video_path"]
dst = str(Path(review_dir) / Path(src).name)
lines.append(f'echo "moving: {src} -> {dst}"')
lines.append(f'mv -n "{src}" "{dst}"')
Path(emit_script).write_text("\n".join(lines) + "\n", encoding="utf-8")
os.chmod(emit_script, 0o755)
click.echo(f"Wrote plan: {emit_script}")
else:
non_keepers = [r for r in rows if r["is_keeper"].lower() != "yes"]
click.echo(
f"{len(non_keepers)} files would be moved. Provide --emit-script and --review-dir to generate a plan."
)
for r in non_keepers[:20]:
click.echo(f"[grp {r['group_id']}] {r['video_path']}")
if __name__ == "__main__":
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment