Last active
August 9, 2025 11:01
-
-
Save supertask/ca6ca029fc9011a38b45a222884632ec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| End-to-end utility for Groq Whisper: | |
| 1) Silence-based split (ffmpeg silencedetect) | |
| 2) Re-merge adjacent parts under size limit (default 25 MB) | |
| 3) Transcribe each merged audio via Groq Whisper Large v3 | |
| 4) Save per-chunk TXT/JSON and merge into a single TXT | |
| Defaults tuned for your workflow: | |
| --noise -35, --duration 3, --target-mb 25 | |
| Dependencies: | |
| - ffmpeg, ffprobe (brew install ffmpeg) | |
| - pip install groq | |
| - export GROQ_API_KEY=... | |
| Usage example: | |
| python split_merge_transcribe_groq.py audio/74_legend_2024-11-30.mp3 --language ja | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import shlex | |
| import subprocess | |
| import sys | |
| import time | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import List, Tuple, Optional | |
| # ===== Directories (clean names) ===== | |
| DEFAULT_SOURCE_DIR = "cache/audio_source" | |
| DEFAULT_PARTS_DIR = "cache/audio_parts" | |
| DEFAULT_MERGED_DIR = "cache/audio_merged" | |
| DEFAULT_TEXT_DIR = "cache/text_parts" | |
| DEFAULT_FINAL_DIR = "text" | |
| # ===== Model ===== | |
| MODEL_NAME = "whisper-large-v3" # ← ご要望通り | |
| @dataclass | |
| class Args: | |
| input: Path | |
| source_dir: Path | |
| noise: float | |
| duration: float | |
| target_mb: float | |
| safety_mb: float | |
| parts_dir: Path | |
| merged_dir: Path | |
| text_dir: Path | |
| final_text_dir: Optional[Path] | |
| auto_final_dir: bool | |
| language: Optional[str] | |
| prompt: Optional[str] | |
| max_retries: int | |
| retry_wait: float | |
| def run(cmd: List[str], quiet=False): | |
| if not quiet: | |
| print("+", " ".join(shlex.quote(c) for c in cmd), flush=True) | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if proc.returncode != 0: | |
| sys.stderr.write(proc.stderr[:1500] + ("\n...truncated...\n" if len(proc.stderr) > 1500 else "")) | |
| raise RuntimeError(f"Command failed: {' '.join(cmd)}") | |
| return proc | |
| def ffprobe_duration(path: Path) -> float: | |
| proc = run(["ffprobe","-v","error","-show_entries","format=duration","-of","default=noprint_wrappers=1:nokey=1",str(path)], quiet=True) | |
| return float(proc.stdout.strip()) | |
| def has_audio_stream(path: Path) -> bool: | |
| """ | |
| Returns True if the media file contains an audio stream. | |
| """ | |
| proc = run([ | |
| "ffprobe","-v","error", | |
| "-select_streams","a:0", | |
| "-show_entries","stream=index", | |
| "-of","csv=p=0", | |
| str(path) | |
| ], quiet=True) | |
| return bool(proc.stdout.strip()) | |
| def derive_final_dir_from_input(input_path: Path) -> Path: | |
| """ | |
| 入力ファイルの親ディレクトリのうち、以下のマッピングに従って最終テキスト出力ディレクトリを導出する。 | |
| - `video` -> `transcript` | |
| - `audio` -> `text` | |
| 例: | |
| /foo/bar/video/baz.mp4 -> /foo/bar/transcript | |
| /foo/bar/audio/baz.mp3 -> /foo/bar/text | |
| 上記いずれの要素も見つからない場合は、デフォルト `text` をカレントからの解決パスとして返す。 | |
| """ | |
| parent_dir = input_path.parent | |
| parts = list(parent_dir.parts) | |
| replaced = False | |
| new_parts: List[str] = [] | |
| for part in parts: | |
| if part == "video": | |
| new_parts.append("transcript") | |
| replaced = True | |
| elif part == "audio": | |
| new_parts.append("text") | |
| replaced = True | |
| else: | |
| new_parts.append(part) | |
| if replaced and new_parts: | |
| return Path(*new_parts) | |
| # フォールバック: カレント配下の text | |
| return Path(DEFAULT_FINAL_DIR).expanduser().resolve() | |
| def derive_audio_dir_from_input(input_path: Path) -> Optional[Path]: | |
| """ | |
| 入力ファイルの親ディレクトリのうち、パス要素 `video` を `audio` に置換した | |
| ディレクトリを導出して返す。`video` が見つからない場合は None を返す。 | |
| 例: /foo/bar/video/baz.mp4 -> /foo/bar/audio | |
| """ | |
| parent_dir = input_path.parent | |
| parts = list(parent_dir.parts) | |
| replaced = False | |
| new_parts: List[str] = [] | |
| for part in parts: | |
| if part == "video": | |
| new_parts.append("audio") | |
| replaced = True | |
| else: | |
| new_parts.append(part) | |
| if replaced and new_parts: | |
| return Path(*new_parts) | |
| return None | |
| def clean_dir(path: Path): | |
| path.mkdir(parents=True, exist_ok=True) | |
| for p in list(path.iterdir()): | |
| if p.is_file(): | |
| try: | |
| p.unlink() | |
| except Exception: | |
| pass | |
| def detect_silences(input_path: Path, noise_db: float, min_silence: float, log_path: Path) -> List[Tuple[float, float]]: | |
| cmd = ["ffmpeg","-hide_banner","-i",str(input_path),"-af",f"silencedetect=noise={noise_db}dB:d={min_silence}","-f","null","-"] | |
| print("Detecting silences...", flush=True) | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| log_path.write_text(proc.stderr, encoding="utf-8") | |
| starts, ends = [], [] | |
| for line in proc.stderr.splitlines(): | |
| if "silence_start:" in line: | |
| m = re.search(r"silence_start:\s*([0-9.]+)", line) | |
| if m: starts.append(float(m.group(1))) | |
| elif "silence_end:" in line: | |
| m = re.search(r"silence_end:\s*([0-9.]+)", line) | |
| if m: ends.append(float(m.group(1))) | |
| pairs = [] | |
| i = j = 0 | |
| while i < len(starts) or j < len(ends): | |
| if i < len(starts) and (j >= len(ends) or starts[i] < ends[j]): | |
| if j < len(ends): | |
| pairs.append((starts[i], ends[j])); i += 1; j += 1 | |
| else: | |
| pairs.append((starts[i], None)); i += 1 | |
| else: | |
| pairs.append((None, ends[j])); j += 1 | |
| return pairs | |
| def cut_by_silence(input_path: Path, silences: List[Tuple[float, float]], outdir: Path) -> List[Path]: | |
| outdir.mkdir(parents=True, exist_ok=True) | |
| cut_points = sorted(set([end for (_, end) in silences if end is not None])) | |
| parts = [] | |
| prev = 0.0 | |
| stem = input_path.stem | |
| ext = input_path.suffix.lstrip(".") | |
| for idx, t in enumerate(cut_points, start=1): | |
| out = outdir / f"{stem}_part{idx}.{ext}" | |
| run(["ffmpeg","-hide_banner","-y","-i",str(input_path),"-ss",f"{prev}","-to",f"{t}","-c","copy",str(out)]) | |
| parts.append(out); prev = t | |
| idx = len(cut_points) + 1 | |
| out = outdir / f"{stem}_part{idx}.{ext}" | |
| run(["ffmpeg","-hide_banner","-y","-i",str(input_path),"-ss",f"{prev}","-c","copy",str(out)]) | |
| parts.append(out) | |
| return parts | |
| def ensure_under_limit(path: Path, target_bytes: int) -> List[Path]: | |
| if path.stat().st_size <= target_bytes: | |
| return [path] | |
| print(f"Chunk {path.name} is {path.stat().st_size/1024/1024:.2f} MB > limit; splitting...", flush=True) | |
| duration = ffprobe_duration(path) | |
| half = duration / 2.0 | |
| parent = path.parent; stem = path.stem; ext = path.suffix.lstrip(".") | |
| p1 = parent / f"{stem}_a.{ext}" | |
| p2 = parent / f"{stem}_b.{ext}" | |
| run(["ffmpeg","-hide_banner","-y","-i",str(path),"-t",f"{half}","-c","copy",str(p1)]) | |
| run(["ffmpeg","-hide_banner","-y","-i",str(path),"-ss",f"{half}","-c","copy",str(p2)]) | |
| try: path.unlink() | |
| except Exception: pass | |
| out = [] | |
| for p in (p1,p2): | |
| out.extend(ensure_under_limit(p, target_bytes)) | |
| return out | |
| def merge_under_limit(parts: List[Path], target_bytes: int, merged_dir: Path, base_stem: str) -> List[Path]: | |
| merged_dir.mkdir(parents=True, exist_ok=True) | |
| normalized = [] | |
| for p in parts: | |
| normalized.extend(ensure_under_limit(p, target_bytes)) | |
| groups = [] | |
| cur, cur_size = [], 0 | |
| for p in normalized: | |
| s = p.stat().st_size | |
| if not cur: | |
| cur = [p]; cur_size = s | |
| else: | |
| if cur_size + s <= target_bytes: | |
| cur.append(p); cur_size += s | |
| else: | |
| groups.append(cur); cur = [p]; cur_size = s | |
| if cur: groups.append(cur) | |
| outputs = [] | |
| for i, group in enumerate(groups, start=1): | |
| list_path = merged_dir / f"{base_stem}_group{i}.txt" | |
| with open(list_path, "w", encoding="utf-8") as f: | |
| for p in group: | |
| f.write(f"file '{p.as_posix()}'\n") | |
| out_path = merged_dir / f"{base_stem}_merged_{i}{group[0].suffix}" | |
| run(["ffmpeg","-hide_banner","-y","-f","concat","-safe","0","-i",str(list_path),"-c","copy",str(out_path)]) | |
| outputs.append(out_path) | |
| return outputs | |
| def has_video_stream(path: Path) -> bool: | |
| """ | |
| Returns True if the media file contains a video stream. | |
| """ | |
| proc = run([ | |
| "ffprobe","-v","error", | |
| "-select_streams","v:0", | |
| "-show_entries","stream=index", | |
| "-of","csv=p=0", | |
| str(path) | |
| ], quiet=True) | |
| return bool(proc.stdout.strip()) | |
| def convert_to_mp3(input_path: Path, output_path: Path, bitrate_kbps: str = "128k") -> Path: | |
| """ | |
| Extracts audio to MP3 using libmp3lame at the given bitrate. | |
| """ | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| run(["ffmpeg","-hide_banner","-y","-i",str(input_path),"-vn","-acodec","libmp3lame","-ab",bitrate_kbps,str(output_path)]) | |
| return output_path | |
| def prepare_audio_input(input_path: Path, source_dir: Path) -> Path: | |
| """ | |
| If the input contains video, extract audio to MP3 in `source_dir` and return that path. | |
| Otherwise, return the original input path. | |
| """ | |
| try: | |
| if has_video_stream(input_path): | |
| # 入力パスに 'video' が含まれていれば、'audio' に置換したディレクトリを優先 | |
| derived_dir = derive_audio_dir_from_input(input_path) | |
| target_dir = derived_dir if derived_dir is not None else source_dir | |
| out_path = target_dir / f"{input_path.stem}.mp3" | |
| return convert_to_mp3(input_path, out_path, "128k") | |
| except Exception: | |
| # If probing fails, fall back to original input | |
| pass | |
| return input_path | |
| def extract_retry_after_seconds(message: str) -> Optional[float]: | |
| """ | |
| Parse patterns like "Please try again in 8m10.422s" or "Please try again in 12.5s" | |
| and return seconds as float. If not found, return None. | |
| """ | |
| try: | |
| m = re.search(r"Please try again in\s+(?:(\d+)m)?(\d+(?:\.\d+)?)s", message) | |
| if m: | |
| minutes = int(m.group(1)) if m.group(1) else 0 | |
| seconds = float(m.group(2)) | |
| return minutes * 60 + seconds | |
| except Exception: | |
| pass | |
| return None | |
| def compute_backoff_seconds(error: Exception, default_wait: float) -> float: | |
| """ | |
| For 429/rate limit errors, prefer server-suggested wait time; otherwise fallback. | |
| """ | |
| message = str(error) | |
| if "429" in message or "rate limit" in message.lower() or "rate_limit" in message.lower(): | |
| parsed = extract_retry_after_seconds(message) | |
| if parsed is not None: | |
| # Add a small safety margin | |
| return max(5.0, min(parsed + 2.0, 3600.0)) | |
| # Sensible default for rate limits when no hint provided | |
| return max(60.0, default_wait) | |
| return max(1.0, default_wait) | |
| def expand_media_inputs(input_path: Path) -> List[Path]: | |
| """ | |
| If `input_path` is a file, return [input_path]. If it is a directory, return | |
| a list of immediate child files that are recognized as audio or video. | |
| (Non-recursive by design.) | |
| """ | |
| if input_path.is_file(): | |
| return [input_path] | |
| if input_path.is_dir(): | |
| candidates: List[Path] = [] | |
| for p in sorted(input_path.iterdir()): | |
| if p.is_file(): | |
| try: | |
| if has_video_stream(p) or has_audio_stream(p): | |
| candidates.append(p) | |
| except Exception: | |
| # Skip unreadable/unprobeable files | |
| pass | |
| return candidates | |
| raise FileNotFoundError(f"Input path not found: {input_path}") | |
| def transcribe_groq(paths: List[Path], text_dir: Path, language: Optional[str], prompt: Optional[str], max_retries: int, retry_wait: float) -> List[Path]: | |
| # Lazy import | |
| try: | |
| from groq import Groq | |
| except Exception as e: | |
| raise RuntimeError("groq Python package is required. Install with: pip install groq") from e | |
| api_key = os.getenv("GROQ_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("Environment variable GROQ_API_KEY is not set.") | |
| client = Groq(api_key=api_key) | |
| text_dir.mkdir(parents=True, exist_ok=True) | |
| txt_paths = [] | |
| for idx, p in enumerate(paths, start=1): | |
| base = p.stem | |
| txt_out = text_dir / f"{base}.txt" | |
| json_out = text_dir / f"{base}.json" | |
| if txt_out.exists() and json_out.exists(): | |
| txt_paths.append(txt_out) | |
| continue | |
| print(f"Transcribing [{idx}/{len(paths)}]: {p.name}") | |
| last_err = None | |
| for attempt in range(1, max_retries+1): | |
| try: | |
| with open(p, "rb") as f: | |
| resp = client.audio.transcriptions.create( | |
| file=(p.name, f.read(), "audio/mpeg"), | |
| model=MODEL_NAME, | |
| response_format="verbose_json", | |
| language=language, | |
| temperature=0, | |
| prompt=prompt | |
| ) | |
| # Normalize resp | |
| try: | |
| data = resp.to_dict() if hasattr(resp, "to_dict") else json.loads(resp.model_dump_json()) | |
| except Exception: | |
| data = {"text": getattr(resp, "text", "")} | |
| text = data.get("text") or "" | |
| json_out.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| txt_out.write_text(text, encoding="utf-8") | |
| txt_paths.append(txt_out) | |
| break | |
| except Exception as e: | |
| last_err = e | |
| print(f" Attempt {attempt} failed: {e}") | |
| if attempt < max_retries: | |
| backoff = compute_backoff_seconds(e, retry_wait) | |
| try: | |
| print(f" Waiting {backoff:.1f}s before retry...") | |
| except Exception: | |
| pass | |
| time.sleep(backoff) | |
| else: | |
| raise RuntimeError(f"Transcription failed for {p} after {max_retries} attempts: {last_err}") | |
| return txt_paths | |
| def merge_texts(txt_paths: List[Path], output_path: Path, input_filename: str): | |
| """ | |
| パート見出しは付けず、冒頭に元ファイル名を1行、その後空行、以降は各TXTを改行で連結。 | |
| """ | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, "w", encoding="utf-8") as out: | |
| # 先頭に元ファイル名 | |
| out.write(f"{input_filename}\n\n") | |
| # 本文は素で結合(間に1空行) | |
| for i, p in enumerate(txt_paths): | |
| try: | |
| body = p.read_text(encoding="utf-8").strip() | |
| except Exception: | |
| body = "" | |
| if body: | |
| out.write(body) | |
| out.write("\n\n") | |
| return output_path | |
| def build_args() -> Args: | |
| ap = argparse.ArgumentParser(description="Split/Merge audio, transcribe with Groq Whisper, and merge texts.") | |
| ap.add_argument("input", help="Input file or directory path (e.g., audio/xxx.mp3 or a folder of videos)") | |
| ap.add_argument("--source-dir", default=DEFAULT_SOURCE_DIR, help="Working dir for extracted audio from video inputs") | |
| ap.add_argument("--noise", type=float, default=-35.0) # ← デフォルト変更 | |
| ap.add_argument("--duration", type=float, default=3.0) # ← デフォルト変更 | |
| ap.add_argument("--target-mb", type=float, default=25.0) # ← デフォルト確認 | |
| ap.add_argument("--safety-mb", type=float, default=0.5) | |
| ap.add_argument("--parts-dir", default=DEFAULT_PARTS_DIR) | |
| ap.add_argument("--merged-dir", default=DEFAULT_MERGED_DIR) | |
| ap.add_argument("--text-dir", default=DEFAULT_TEXT_DIR) | |
| ap.add_argument("--final-text-dir", default=DEFAULT_FINAL_DIR) # 最終はこの直下に <stem>.txt を作る | |
| ap.add_argument("--language", default=None) | |
| ap.add_argument("--prompt", default=None) | |
| ap.add_argument("--max-retries", type=int, default=3) | |
| ap.add_argument("--retry-wait", type=float, default=3.0) | |
| a = ap.parse_args() | |
| input_path = Path(a.input).expanduser().resolve() | |
| # ユーザーが --final-text-dir を指定していない場合(デフォルト 'text' のまま)は、 | |
| # per-file で動的に導出する(各入力ファイルのパス上 'audio'→'text' に置換)。 | |
| if a.final_text_dir == DEFAULT_FINAL_DIR: | |
| final_dir = None | |
| auto_final = True | |
| else: | |
| final_dir = Path(a.final_text_dir).expanduser().resolve() | |
| auto_final = False | |
| return Args( | |
| input=input_path, | |
| source_dir=Path(a.source_dir).expanduser().resolve(), | |
| noise=a.noise, | |
| duration=a.duration, | |
| target_mb=a.target_mb, | |
| safety_mb=a.safety_mb, | |
| parts_dir=Path(a.parts_dir).expanduser().resolve(), | |
| merged_dir=Path(a.merged_dir).expanduser().resolve(), | |
| text_dir=Path(a.text_dir).expanduser().resolve(), | |
| final_text_dir=final_dir, | |
| auto_final_dir=auto_final, | |
| language=a.language, | |
| prompt=a.prompt, | |
| max_retries=a.max_retries, | |
| retry_wait=a.retry_wait | |
| ) | |
| def main(): | |
| args = build_args() | |
| # Clean caches | |
| for d in [args.source_dir, args.parts_dir, args.merged_dir, args.text_dir]: | |
| clean_dir(d) | |
| inputs: List[Path] = expand_media_inputs(args.input) | |
| final_outputs: List[Path] = [] | |
| for original_input in inputs: | |
| # 既に最終出力(transcript/text)が存在する場合はスキップ | |
| if args.auto_final_dir: | |
| per_file_final_dir = derive_final_dir_from_input(original_input) | |
| else: | |
| per_file_final_dir = args.final_text_dir if args.final_text_dir else Path(DEFAULT_FINAL_DIR).expanduser().resolve() | |
| precheck_final_text_path = per_file_final_dir / f"{original_input.stem}.txt" | |
| if precheck_final_text_path.exists(): | |
| print(f"Skipping (already exists): {precheck_final_text_path}") | |
| final_outputs.append(precheck_final_text_path) | |
| continue | |
| # If input is a video, extract audio to MP3 first | |
| work_input = prepare_audio_input(original_input, args.source_dir) | |
| # Split & Merge on prepared audio | |
| log_path = args.parts_dir / f"silence_{work_input.stem}.log" | |
| silences = detect_silences(work_input, args.noise, args.duration, log_path) | |
| parts = cut_by_silence(work_input, silences, args.parts_dir) | |
| target_bytes = int((args.target_mb - args.safety_mb) * 1024 * 1024) | |
| merged_paths = merge_under_limit(parts, target_bytes, args.merged_dir, work_input.stem) | |
| # Transcribe each merged audio | |
| txt_paths = transcribe_groq(merged_paths, args.text_dir, args.language, args.prompt, args.max_retries, args.retry_wait) | |
| # Final merged text file path per original input | |
| if args.auto_final_dir: | |
| per_file_final_dir = derive_final_dir_from_input(original_input) | |
| else: | |
| per_file_final_dir = args.final_text_dir if args.final_text_dir else Path(DEFAULT_FINAL_DIR).expanduser().resolve() | |
| final_text_path = per_file_final_dir / f"{original_input.stem}.txt" | |
| merge_texts(txt_paths, final_text_path, input_filename=original_input.as_posix()) | |
| final_outputs.append(final_text_path) | |
| print("\n=== DONE (file) ===") | |
| if work_input != original_input: | |
| print("Source audio :", work_input) | |
| print("Audio parts dir :", args.parts_dir) | |
| print("Audio merged dir :", args.merged_dir) | |
| print("Text parts dir :", args.text_dir) | |
| print("Final transcript :", final_text_path) | |
| if len(final_outputs) > 1: | |
| print("\n=== ALL FILES DONE ===") | |
| for p in final_outputs: | |
| print("-", p) | |
| if __name__ == "__main__": | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment