Created
November 6, 2025 01:18
-
-
Save pszemraj/56b2efdfc8a2ae6be9e0dc2edef27e79 to your computer and use it in GitHub Desktop.
GVE + SimSIMD video deduplication CLI via https://gzn00417.github.io/GVE/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Apache-2.0 | |
| GVE + SimSIMD video deduplication CLI via https://gzn00417.github.io/GVE/ | |
| Design highlights | |
| - Embeddings: GVE (Qwen2.5-VL based) last-token pooled + ℓ2-normalized (bf16/float16), per paper/model card. | |
| - Test-time policy: 8 frames baseline, scale with duration (16/32/48) for long videos; ~200 visual tokens per frame. | |
| - Similarity: cosine via SimSIMD on CPU (distances -> similarities), no heavyweight ANN index. | |
| - Scope: Optimized for ≤ ~1k videos (global), and segment-level partial duplicates at small/medium scale (streaming top-K). | |
| - I/O: Pathlib everywhere; embeddings persisted as .npy (float16) + JSONL metadata. | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import hashlib | |
| import json | |
| import math | |
| import os | |
| import time | |
| import warnings | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Tuple | |
| import click | |
| import numpy as np | |
| import simsimd | |
| import torch | |
| import torch.nn.functional as F | |
| from transformers import AutoModel, AutoProcessor | |
| # TODO: remove/handle LLM nanny code below | |
| # ---- Optional video backends | |
| try: | |
| import decord # type: ignore | |
| decord.bridge.set_bridge("torch") | |
| _DECORD_OK = True | |
| except Exception: | |
| _DECORD_OK = False | |
| try: | |
| import cv2 # type: ignore | |
| _CV2_OK = True | |
| except Exception: | |
| _CV2_OK = False | |
| # transformers>=4.44 sometimes exposes this helper; optional | |
| try: | |
| from transformers import process_vision_info # type: ignore | |
| _HAS_PVI = True | |
| except Exception: | |
| process_vision_info = None | |
| _HAS_PVI = False | |
| # ------------------------------ | |
| # Constants & utilities | |
| # ------------------------------ | |
| VIDEO_EXTS = {".mp4", ".mov", ".mkv", ".avi", ".webm", ".m4v"} | |
| @dataclass | |
| class VideoInfo: | |
| path: str | |
| size_bytes: int | |
| mtime: float | |
| width: Optional[int] | |
| height: Optional[int] | |
| fps: Optional[float] | |
| duration_s: Optional[float] | |
| n_frames: Optional[int] | |
| @dataclass | |
| class EmbRow: | |
| id: int | |
| kind: str # "global" | "segment" | |
| video_path: str | |
| video_id: str | |
| start_s: Optional[float] | |
| end_s: Optional[float] | |
| policy: str | |
| dims: int | |
| class DSU: | |
| def __init__(self, n: int) -> None: | |
| self.p = list(range(n)) | |
| self.r = [0] * n | |
| def find(self, x: int) -> int: | |
| while self.p[x] != x: | |
| self.p[x] = self.p[self.p[x]] | |
| x = self.p[x] | |
| return x | |
| def union(self, a: int, b: int) -> None: | |
| ra, rb = self.find(a), self.find(b) | |
| if ra == rb: | |
| return | |
| if self.r[ra] < self.r[rb]: | |
| self.p[ra] = rb | |
| elif self.r[ra] > self.r[rb]: | |
| self.p[rb] = ra | |
| else: | |
| self.p[rb] = ra | |
| self.r[ra] += 1 | |
| def sha1_of_string(s: str) -> str: | |
| return hashlib.sha1(s.encode("utf-8")).hexdigest() | |
| def stable_video_id(p: Path) -> str: | |
| st = p.stat() | |
| payload = f"{p.resolve()}::{st.st_size}::{int(st.st_mtime)}" | |
| return sha1_of_string(payload) | |
| def is_video_file(p: Path) -> bool: | |
| return p.suffix.lower() in VIDEO_EXTS | |
| def ensure_outdir(path: Path) -> None: | |
| path.mkdir(parents=True, exist_ok=True) | |
| def walk_videos(root: Path, glob: str) -> List[Path]: | |
| return [p for p in root.rglob(glob) if p.is_file() and is_video_file(p)] | |
| # ------------------------------ | |
| # Video probing & sampling | |
| # ------------------------------ | |
| def probe_video(p: Path) -> VideoInfo: | |
| width = height = None | |
| fps = None | |
| n_frames = None | |
| duration_s = None | |
| if _DECORD_OK: | |
| try: | |
| vr = decord.VideoReader(str(p)) | |
| h, w = vr[0].shape[0], vr[0].shape[1] | |
| width, height = int(w), int(h) | |
| n_frames = int(len(vr)) | |
| fps = float(vr.get_avg_fps()) if hasattr(vr, "get_avg_fps") else None | |
| if fps and n_frames: | |
| duration_s = n_frames / fps | |
| except Exception: | |
| pass | |
| if duration_s is None and _CV2_OK: | |
| try: | |
| cap = cv2.VideoCapture(str(p)) | |
| if cap.isOpened(): | |
| n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or None | |
| fps_val = cap.get(cv2.CAP_PROP_FPS) | |
| fps = float(fps_val) if fps_val and fps_val > 1e-6 else None | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or None | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or None | |
| if fps and n_frames: | |
| duration_s = n_frames / fps | |
| cap.release() | |
| except Exception: | |
| pass | |
| st = p.stat() | |
| return VideoInfo( | |
| path=str(p), | |
| size_bytes=st.st_size, | |
| mtime=st.st_mtime, | |
| width=width, | |
| height=height, | |
| fps=fps, | |
| duration_s=duration_s, | |
| n_frames=n_frames, | |
| ) | |
| def auto_frames_for_duration(duration_s: Optional[float]) -> int: | |
| """ | |
| Paper-informed "auto" test-time scaling: | |
| <=5 min: 8; 5–30: 16; 30–60: 32; >60: 48. | |
| GVE shows gains for long-context from more frames; keep ~200 tokens/frame. :contentReference[oaicite:1]{index=1} | |
| """ | |
| if duration_s is None: | |
| return 8 | |
| m = duration_s / 60.0 | |
| if m <= 5: | |
| return 8 | |
| if m <= 30: | |
| return 16 | |
| if m <= 60: | |
| return 32 | |
| return 48 | |
| def sample_frame_indices(n_total: int, n_wanted: int) -> List[int]: | |
| if n_total <= 0 or n_wanted <= 0: | |
| return [] | |
| n = min(n_total, n_wanted) | |
| if n == n_total: | |
| return list(range(n_total)) | |
| return [int(i * (n_total - 1) / (n - 1)) for i in range(n)] | |
| def read_frames_as_numpy( | |
| p: Path, n_frames: int, start_frac: float = 0.0, end_frac: float = 1.0 | |
| ) -> List[np.ndarray]: | |
| frames: List[np.ndarray] = [] | |
| if _DECORD_OK: | |
| try: | |
| vr = decord.VideoReader(str(p)) | |
| total = len(vr) | |
| s_idx = int(total * start_frac) | |
| e_idx = max(s_idx + 1, int(total * end_frac)) | |
| idxs = sample_frame_indices(e_idx - s_idx, n_frames) | |
| idxs = [s_idx + k for k in idxs] | |
| batch = vr.get_batch(idxs).asnumpy() # (N, H, W, C), RGB, uint8 | |
| frames = [batch[i] for i in range(batch.shape[0])] | |
| return frames | |
| except Exception: | |
| pass | |
| if _CV2_OK: | |
| try: | |
| cap = cv2.VideoCapture(str(p)) | |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total and total > 0: | |
| s_idx = int(total * start_frac) | |
| e_idx = max(s_idx + 1, int(total * end_frac)) | |
| idxs = sample_frame_indices(e_idx - s_idx, n_frames) | |
| idxs = [s_idx + k for k in idxs] | |
| for idx in idxs: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) | |
| ok, bgr = cap.read() | |
| if not ok or bgr is None: | |
| continue | |
| rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) | |
| frames.append(rgb) | |
| cap.release() | |
| except Exception: | |
| pass | |
| return frames | |
| # ------------------------------ | |
| # GVE embedder | |
| # ------------------------------ | |
| class GVEEmbedder: | |
| """ | |
| Minimal wrapper around GVE to produce normalized embeddings. | |
| Last-token pooling + L2 normalization, per architecture notes. :contentReference[oaicite:2]{index=2} | |
| """ | |
| def __init__( | |
| self, | |
| model_name: str = "Alibaba-NLP/GVE-3B", | |
| device: Optional[str] = None, | |
| dtype: str = "bfloat16", | |
| max_pixels: int = 200 | |
| * 28 | |
| * 28, # ~200 tokens/frame; avoid >400 where perf can drop. :contentReference[oaicite:3]{index=3} | |
| max_length: int = 1200, | |
| ) -> None: | |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") | |
| if dtype == "bfloat16": | |
| torch_dtype = torch.bfloat16 | |
| elif dtype == "float16": | |
| torch_dtype = torch.float16 | |
| else: | |
| torch_dtype = torch.float32 | |
| self.model = AutoModel.from_pretrained( | |
| model_name, | |
| trust_remote_code=True, | |
| device_map="auto" if self.device == "cuda" else None, | |
| low_cpu_mem_usage=True, | |
| torch_dtype=torch_dtype, | |
| ) | |
| self.processor = AutoProcessor.from_pretrained( | |
| model_name, | |
| trust_remote_code=True, | |
| use_fast=True, | |
| ) | |
| if hasattr(self.processor, "tokenizer"): | |
| self.processor.tokenizer.padding_side = "left" | |
| self.max_pixels = max_pixels | |
| self.max_length = max_length | |
| @torch.inference_mode() | |
| def embed_video_path( | |
| self, | |
| video_path: Path, | |
| n_frames: int, | |
| fps: Optional[float] = None, | |
| prompt: str = "Represent this video for retrieval/deduplication.", | |
| out_dtype: torch.dtype = torch.float16, | |
| ) -> np.ndarray: | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "video", | |
| "video": str(video_path), | |
| "max_pixels": self.max_pixels, | |
| "fps": fps if fps and fps > 0 else 1.0, | |
| "max_frames": int(n_frames), | |
| }, | |
| {"type": "text", "text": prompt}, | |
| ], | |
| }, | |
| ] | |
| texts = self.processor.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| image_inputs = None | |
| video_inputs = [str(video_path)] | |
| video_kwargs = { | |
| "max_pixels": self.max_pixels, | |
| "fps": fps if fps and fps > 0 else 1.0, | |
| "max_frames": int(n_frames), | |
| } | |
| if _HAS_PVI and process_vision_info is not None: | |
| try: | |
| image_inputs, video_inputs, video_kwargs = process_vision_info( | |
| messages, return_video_kwargs=True | |
| ) | |
| except Exception: | |
| pass | |
| inputs = self.processor( | |
| text=[texts], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| truncation=True, | |
| max_length=self.max_length, | |
| return_tensors="pt", | |
| **video_kwargs, | |
| ).to(self.device) | |
| outputs = self.model(**inputs) | |
| vec = outputs["last_hidden_state"][:, -1, :] | |
| vec = F.normalize(vec, p=2, dim=1) | |
| # Move to CPU, cast to out_dtype (float16 default), return as numpy | |
| return vec.squeeze(0).to("cpu", dtype=out_dtype).numpy() | |
| @torch.inference_mode() | |
| def embed_frame_bundle( | |
| self, | |
| frames: List[np.ndarray], | |
| prompt: str = "Represent this video segment for retrieval/deduplication.", | |
| out_dtype: torch.dtype = torch.float16, | |
| ) -> np.ndarray: | |
| content = [{"type": "image", "image": fr} for fr in frames] | |
| content.append({"type": "text", "text": prompt}) | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": content}, | |
| ] | |
| texts = self.processor.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| image_inputs = None | |
| if _HAS_PVI and process_vision_info is not None: | |
| try: | |
| image_inputs, _, _ = process_vision_info( | |
| messages, return_video_kwargs=True | |
| ) | |
| except Exception: | |
| image_inputs = [fr for fr in frames] | |
| else: | |
| image_inputs = [fr for fr in frames] | |
| inputs = self.processor( | |
| text=[texts], | |
| images=image_inputs, | |
| padding=True, | |
| truncation=True, | |
| max_length=self.max_length, | |
| return_tensors="pt", | |
| ).to(self.device) | |
| outputs = self.model(**inputs) | |
| vec = outputs["last_hidden_state"][:, -1, :] | |
| vec = F.normalize(vec, p=2, dim=1) | |
| return vec.squeeze(0).to("cpu", dtype=out_dtype).numpy() | |
| # ------------------------------ | |
| # Persistence helpers | |
| # ------------------------------ | |
| def save_matrix_and_meta( | |
| outdir: Path, name: str, vectors: np.ndarray, rows: List[EmbRow] | |
| ) -> None: | |
| ensure_outdir(outdir) | |
| np.save(outdir / f"{name}_embeddings.npy", vectors.astype(np.float16)) | |
| with (outdir / f"{name}_meta.jsonl").open("w", encoding="utf-8") as f: | |
| for r in rows: | |
| f.write(json.dumps(asdict(r), ensure_ascii=False) + "\n") | |
| def load_matrix_and_meta(outdir: Path, name: str) -> Tuple[np.ndarray, List[EmbRow]]: | |
| vecs = np.load(outdir / f"{name}_embeddings.npy") | |
| rows: List[EmbRow] = [] | |
| with (outdir / f"{name}_meta.jsonl").open("r", encoding="utf-8") as f: | |
| for line in f: | |
| rows.append(EmbRow(**json.loads(line))) | |
| return vecs, rows | |
| # ------------------------------ | |
| # SimSIMD-based search | |
| # ------------------------------ | |
| def _dtype_string_for_simsimd(arr: np.ndarray) -> str: | |
| if arr.dtype == np.float16: | |
| return "float16" | |
| if ( | |
| arr.dtype == np.bfloat16 | |
| ): # numpy doesn't have bfloat16 widely; kept for completeness | |
| return "bfloat16" | |
| if arr.dtype == np.float32: | |
| return "float32" | |
| return "float32" | |
| def simsimd_full_cosine_topk( | |
| X: np.ndarray, topk: int, threads: int = 0 | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Full NxN cosine distances via SimSIMD, then convert to similarities and take topk per row. | |
| For N <= ~5k this is fine; here optimized for ≤ ~1k videos. | |
| """ | |
| dtype_str = _dtype_string_for_simsimd(X) | |
| # distances in [0,2], cosine distance = 1 - cosine_similarity | |
| D = simsimd.cdist(X, X, metric="cosine", threads=threads, dtype=dtype_str) | |
| D = np.array(D, copy=True) # materialize DistancesTensor | |
| # self distances -> +inf to exclude | |
| n = D.shape[0] | |
| np.fill_diagonal(D, np.inf) | |
| S = 1.0 - D # similarity in [-1,1] | |
| if topk >= n: | |
| idx = np.argsort(-S, axis=1) | |
| scr = np.take_along_axis(S, idx, axis=1) | |
| return idx, scr | |
| idx_part = np.argpartition(-S, kth=range(topk), axis=1)[:, :topk] | |
| row = np.arange(n)[:, None] | |
| scr_part = S[row, idx_part] | |
| order = np.argsort(-scr_part, axis=1) | |
| idx_sorted = idx_part[row, order] | |
| scr_sorted = scr_part[row, order] | |
| return idx_sorted, scr_sorted | |
| def simsimd_streaming_cosine_topk( | |
| X: np.ndarray, | |
| topk: int, | |
| threads: int = 0, | |
| block_rows: int = 1024, | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Stream rows in blocks against full X to get topk neighbors. | |
| Useful for larger segment sets where full NxN wouldn't fit comfortably. | |
| """ | |
| dtype_str = _dtype_string_for_simsimd(X) | |
| N = X.shape[0] | |
| idx_all = np.empty((N, topk), dtype=np.int32) | |
| scr_all = np.empty((N, topk), dtype=np.float32) | |
| for start in range(0, N, block_rows): | |
| end = min(N, start + block_rows) | |
| D = simsimd.cdist( | |
| X[start:end], X, metric="cosine", threads=threads, dtype=dtype_str | |
| ) | |
| D = np.array(D, copy=True) | |
| # set self-distances to +inf for rows in this block | |
| for i in range(start, end): | |
| D[i - start, i] = np.inf | |
| S = 1.0 - D | |
| if topk >= N: | |
| idx_block = np.argsort(-S, axis=1) | |
| scr_block = np.take_along_axis(S, idx_block, axis=1) | |
| else: | |
| idx_part = np.argpartition(-S, kth=range(topk), axis=1)[:, :topk] | |
| row = np.arange(S.shape[0])[:, None] | |
| scr_part = S[row, idx_part] | |
| order = np.argsort(-scr_part, axis=1) | |
| idx_block = idx_part[row, order] | |
| scr_block = scr_part[row, order] | |
| idx_all[start:end] = idx_block | |
| scr_all[start:end] = scr_block | |
| return idx_all, scr_all | |
| def build_edges_from_topk( | |
| idx: np.ndarray, scr: np.ndarray, threshold: float | |
| ) -> List[Tuple[int, int, float]]: | |
| """ | |
| Form undirected edges (i<j) for pairs with similarity >= threshold. | |
| """ | |
| N, K = idx.shape | |
| edges: Dict[Tuple[int, int], float] = {} | |
| for i in range(N): | |
| for jpos in range(K): | |
| j = int(idx[i, jpos]) | |
| if j < 0 or j == i: | |
| continue | |
| s = float(scr[i, jpos]) | |
| if s >= threshold: | |
| a, b = (i, j) if i < j else (j, i) | |
| if a == b: | |
| continue | |
| prev = edges.get((a, b), -1.0) | |
| if s > prev: | |
| edges[(a, b)] = s | |
| return [(a, b, s) for (a, b), s in edges.items()] | |
| # ------------------------------ | |
| # CLI | |
| # ------------------------------ | |
| @click.group(context_settings=dict(help_option_names=["-h", "--help"])) | |
| def cli() -> None: | |
| """GVE + SimSIMD video deduplication toolkit.""" | |
| @cli.command() | |
| @click.argument("input_dir", type=click.Path(path_type=Path, exists=True)) | |
| @click.argument("index_dir", type=click.Path(path_type=Path)) | |
| @click.option("--model", default="Alibaba-NLP/GVE-3B", show_default=True) | |
| @click.option("--device", default=None, help="torch device; default auto.") | |
| @click.option( | |
| "--dtype", | |
| default="bfloat16", | |
| type=click.Choice(["bfloat16", "float16", "float32"]), | |
| show_default=True, | |
| ) | |
| @click.option( | |
| "--max-pixels", | |
| default=200 * 28 * 28, | |
| show_default=True, | |
| type=int, | |
| help="~visual tokens per frame.", | |
| ) | |
| @click.option( | |
| "--global-frames", default="auto", show_default=True, help='"auto" or integer.' | |
| ) | |
| @click.option( | |
| "--segments/--no-segments", | |
| default=True, | |
| show_default=True, | |
| help="Compute segment-level embeddings.", | |
| ) | |
| @click.option("--segment-seconds", default=60.0, show_default=True) | |
| @click.option("--segment-stride-seconds", default=30.0, show_default=True) | |
| @click.option("--frames-per-segment", default=8, show_default=True) | |
| @click.option( | |
| "--glob", default="**/*", show_default=True, help="Glob pattern within input_dir." | |
| ) | |
| def index( | |
| input_dir: Path, | |
| index_dir: Path, | |
| model: str, | |
| device: Optional[str], | |
| dtype: str, | |
| max_pixels: int, | |
| global_frames: str, | |
| segments: bool, | |
| segment_seconds: float, | |
| segment_stride_seconds: float, | |
| frames_per_segment: int, | |
| glob: str, | |
| ) -> None: | |
| """ | |
| Build embeddings (global + optional segments) and save under INDEX_DIR. | |
| """ | |
| t0 = time.time() | |
| embedder = GVEEmbedder( | |
| model_name=model, device=device, dtype=dtype, max_pixels=max_pixels | |
| ) | |
| videos = walk_videos(input_dir, glob) | |
| videos.sort() | |
| click.echo(f"Found {len(videos)} videos. Indexing...") | |
| g_vecs: List[np.ndarray] = [] | |
| g_rows: List[EmbRow] = [] | |
| s_vecs: List[np.ndarray] = [] | |
| s_rows: List[EmbRow] = [] | |
| gid = 0 | |
| sid = 0 | |
| for vp in videos: | |
| info = probe_video(vp) | |
| vid = stable_video_id(vp) | |
| n_frames_glob = ( | |
| auto_frames_for_duration(info.duration_s) | |
| if global_frames == "auto" | |
| else max(1, int(global_frames)) | |
| ) | |
| # Global embedding | |
| try: | |
| g = embedder.embed_video_path( | |
| vp, n_frames=n_frames_glob, fps=info.fps or 1.0, out_dtype=torch.float16 | |
| ) | |
| g_vecs.append(g) | |
| g_rows.append( | |
| EmbRow( | |
| id=gid, | |
| kind="global", | |
| video_path=str(vp.resolve()), | |
| video_id=vid, | |
| start_s=None, | |
| end_s=None, | |
| policy=f"global-{n_frames_glob}", | |
| dims=int(g.shape[0]), | |
| ) | |
| ) | |
| gid += 1 | |
| except Exception as e: | |
| click.echo(f"[warn] Global embedding failed for {vp}: {e}", err=True) | |
| continue | |
| # Segment embeddings | |
| if ( | |
| segments | |
| and info.duration_s | |
| and info.duration_s >= max(segment_seconds, segment_stride_seconds) | |
| ): | |
| total = info.duration_s | |
| start = 0.0 | |
| while start < total: | |
| end = min(start + segment_seconds, total) | |
| s_frac = max(0.0, start / total) | |
| e_frac = min(1.0, end / total) | |
| frames = read_frames_as_numpy( | |
| vp, n_frames=frames_per_segment, start_frac=s_frac, end_frac=e_frac | |
| ) | |
| if len(frames) >= max(2, frames_per_segment // 2): | |
| try: | |
| s = embedder.embed_frame_bundle(frames, out_dtype=torch.float16) | |
| s_vecs.append(s) | |
| s_rows.append( | |
| EmbRow( | |
| id=sid, | |
| kind="segment", | |
| video_path=str(vp.resolve()), | |
| video_id=vid, | |
| start_s=float(start), | |
| end_s=float(end), | |
| policy=f"segments@{int(segment_seconds)}s_stride={int(segment_stride_seconds)}s_frames={frames_per_segment}", | |
| dims=int(s.shape[0]), | |
| ) | |
| ) | |
| sid += 1 | |
| except Exception as e: | |
| click.echo( | |
| f"[warn] Segment embedding failed for {vp} [{start:.1f}-{end:.1f}s]: {e}", | |
| err=True, | |
| ) | |
| start += segment_stride_seconds | |
| outdir = Path(index_dir) | |
| ensure_outdir(outdir) | |
| if g_vecs: | |
| G = np.stack(g_vecs, axis=0) # float16 normalized | |
| save_matrix_and_meta(outdir, "global", G, g_rows) | |
| click.echo(f"Saved global embeddings: {G.shape} -> {outdir}") | |
| if s_vecs: | |
| S = np.stack(s_vecs, axis=0) | |
| save_matrix_and_meta(outdir, "segment", S, s_rows) | |
| click.echo(f"Saved segment embeddings: {S.shape} -> {outdir}") | |
| click.echo(f"Indexing done in {time.time() - t0:.1f}s") | |
| @cli.command() | |
| @click.argument("index_dir", type=click.Path(path_type=Path, exists=True)) | |
| @click.argument("report_csv", type=click.Path(path_type=Path)) | |
| @click.option( | |
| "--global-th", | |
| default=0.92, | |
| show_default=True, | |
| help="Cosine similarity threshold (global).", | |
| ) | |
| @click.option( | |
| "--segment-th", | |
| default=0.88, | |
| show_default=True, | |
| help="Cosine similarity threshold (segments).", | |
| ) | |
| @click.option( | |
| "--segment-min-matches", | |
| default=3, | |
| show_default=True, | |
| help="Count threshold to merge via segments.", | |
| ) | |
| @click.option( | |
| "--segment-min-fraction", | |
| default=0.35, | |
| show_default=True, | |
| help="Fraction threshold to merge via segments.", | |
| ) | |
| @click.option( | |
| "--topk", default=10, show_default=True, help="Neighbors to consider per item." | |
| ) | |
| @click.option( | |
| "--segment-topk", default=10, show_default=True, help="Neighbors per segment item." | |
| ) | |
| @click.option( | |
| "--threads", default=0, show_default=True, help="SimSIMD threads (0 = all cores)." | |
| ) | |
| def dedupe( | |
| index_dir: Path, | |
| report_csv: Path, | |
| global_th: float, | |
| segment_th: float, | |
| segment_min_matches: int, | |
| segment_min_fraction: float, | |
| topk: int, | |
| segment_topk: int, | |
| threads: int, | |
| ) -> None: | |
| """ | |
| Build duplicate clusters from stored embeddings; write a reviewable CSV report. | |
| """ | |
| # Load global | |
| G, grows = load_matrix_and_meta(index_dir, "global") | |
| # Ensure normalized (should be already) | |
| G = G.astype(np.float16) | |
| # Top-k with full NxN (≤ ~1k is fine) | |
| g_idx, g_scr = simsimd_full_cosine_topk(G, topk=topk, threads=threads) | |
| g_edges = build_edges_from_topk(g_idx, g_scr, threshold=global_th) | |
| dsu = DSU(len(grows)) | |
| for a, b, s in g_edges: | |
| dsu.union(a, b) | |
| # Optional: segment-level partial matches | |
| seg_pairs: Dict[Tuple[str, str], int] = {} | |
| seg_total: Dict[str, int] = {} | |
| if (index_dir / "segment_embeddings.npy").exists() and ( | |
| index_dir / "segment_meta.jsonl" | |
| ).exists(): | |
| S, srows = load_matrix_and_meta(index_dir, "segment") | |
| S = S.astype(np.float16) | |
| s_idx, s_scr = simsimd_streaming_cosine_topk( | |
| S, topk=segment_topk, threads=threads, block_rows=1024 | |
| ) | |
| # accumulate cross-video matches | |
| for i in range(S.shape[0]): | |
| ri = srows[i] | |
| seg_total[ri.video_id] = seg_total.get(ri.video_id, 0) + 1 | |
| for jpos in range(s_idx.shape[1]): | |
| j = int(s_idx[i, jpos]) | |
| if j < 0: | |
| continue | |
| rj = srows[j] | |
| if ri.video_id == rj.video_id: | |
| continue | |
| if float(s_scr[i, jpos]) < segment_th: | |
| continue | |
| a, b = sorted([ri.video_id, rj.video_id]) | |
| seg_pairs[(a, b)] = seg_pairs.get((a, b), 0) + 1 | |
| # Promote strong partial-dup pairs to clusters | |
| for (a, b), cnt in seg_pairs.items(): | |
| fa = cnt / max(1, seg_total.get(a, 1)) | |
| fb = cnt / max(1, seg_total.get(b, 1)) | |
| if cnt >= segment_min_matches or max(fa, fb) >= segment_min_fraction: | |
| ia = next((i for i, r in enumerate(grows) if r.video_id == a), None) | |
| ib = next((i for i, r in enumerate(grows) if r.video_id == b), None) | |
| if ia is not None and ib is not None: | |
| dsu.union(ia, ib) | |
| # Collect final groups | |
| groups: Dict[int, List[int]] = {} | |
| for i in range(len(grows)): | |
| root = dsu.find(i) | |
| groups.setdefault(root, []).append(i) | |
| # Keeper heuristic: prefer longer duration -> higher resolution -> larger file | |
| info_cache: Dict[str, VideoInfo] = {} | |
| def info_of(path_str: str) -> VideoInfo: | |
| if path_str not in info_cache: | |
| info_cache[path_str] = probe_video(Path(path_str)) | |
| return info_cache[path_str] | |
| def keeper_key(i: int) -> Tuple[float, int, int]: | |
| r = grows[i] | |
| inf = info_of(r.video_path) | |
| dur = inf.duration_s or 0.0 | |
| px = (inf.width or 0) * (inf.height or 0) | |
| size = inf.size_bytes | |
| return (dur, px, size) | |
| with report_csv.open("w", newline="", encoding="utf-8") as f: | |
| w = csv.writer(f) | |
| w.writerow( | |
| [ | |
| "group_id", | |
| "is_keeper", | |
| "reason", | |
| "video_path", | |
| "video_id", | |
| "duration_s", | |
| "resolution", | |
| "size_bytes", | |
| ] | |
| ) | |
| for gid, members in enumerate(sorted(groups.values(), key=lambda xs: -len(xs))): | |
| best = max(members, key=keeper_key) | |
| reason = "global" if not seg_pairs else "global|partial" | |
| for m in members: | |
| r = grows[m] | |
| inf = info_of(r.video_path) | |
| w.writerow( | |
| [ | |
| gid, | |
| "yes" if m == best else "no", | |
| reason, | |
| r.video_path, | |
| r.video_id, | |
| f"{(inf.duration_s or 0.0):.2f}", | |
| f"{inf.width or 0}x{inf.height or 0}", | |
| inf.size_bytes, | |
| ] | |
| ) | |
| click.echo(f"Wrote report to {report_csv}") | |
| @cli.command() | |
| @click.argument("report_csv", type=click.Path(path_type=Path, exists=True)) | |
| @click.option( | |
| "--emit-script", | |
| type=click.Path(path_type=Path), | |
| help="Write a shell script with `mv -n` for non-keepers.", | |
| ) | |
| @click.option( | |
| "--review-dir", | |
| type=click.Path(path_type=Path), | |
| help="Destination folder for non-keepers.", | |
| ) | |
| def plan_actions( | |
| report_csv: Path, emit_script: Optional[Path], review_dir: Optional[Path] | |
| ) -> None: | |
| """ | |
| Turn the report into a reviewable action plan (no deletes). | |
| """ | |
| rows = [] | |
| with report_csv.open("r", encoding="utf-8") as f: | |
| for r in csv.DictReader(f): | |
| rows.append(r) | |
| if not rows: | |
| click.echo("Nothing to do.") | |
| return | |
| if emit_script and review_dir: | |
| ensure_outdir(review_dir) | |
| lines = ["#!/usr/bin/env bash", "set -euo pipefail"] | |
| for r in rows: | |
| if r["is_keeper"].lower() != "yes": | |
| src = r["video_path"] | |
| dst = str(Path(review_dir) / Path(src).name) | |
| lines.append(f'echo "moving: {src} -> {dst}"') | |
| lines.append(f'mv -n "{src}" "{dst}"') | |
| Path(emit_script).write_text("\n".join(lines) + "\n", encoding="utf-8") | |
| os.chmod(emit_script, 0o755) | |
| click.echo(f"Wrote plan: {emit_script}") | |
| else: | |
| non_keepers = [r for r in rows if r["is_keeper"].lower() != "yes"] | |
| click.echo( | |
| f"{len(non_keepers)} files would be moved. Provide --emit-script and --review-dir to generate a plan." | |
| ) | |
| for r in non_keepers[:20]: | |
| click.echo(f"[grp {r['group_id']}] {r['video_path']}") | |
| if __name__ == "__main__": | |
| cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment