Skip to content

Instantly share code, notes, and snippets.

@supertask
Last active August 9, 2025 11:01
Show Gist options
  • Select an option

  • Save supertask/ca6ca029fc9011a38b45a222884632ec to your computer and use it in GitHub Desktop.

Select an option

Save supertask/ca6ca029fc9011a38b45a222884632ec to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
End-to-end utility for Groq Whisper:
1) Silence-based split (ffmpeg silencedetect)
2) Re-merge adjacent parts under size limit (default 25 MB)
3) Transcribe each merged audio via Groq Whisper Large v3
4) Save per-chunk TXT/JSON and merge into a single TXT
Defaults tuned for your workflow:
--noise -35, --duration 3, --target-mb 25
Dependencies:
- ffmpeg, ffprobe (brew install ffmpeg)
- pip install groq
- export GROQ_API_KEY=...
Usage example:
python split_merge_transcribe_groq.py audio/74_legend_2024-11-30.mp3 --language ja
"""
import argparse
import json
import os
import re
import shlex
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple, Optional
# ===== Directories (clean names) =====
DEFAULT_SOURCE_DIR = "cache/audio_source"
DEFAULT_PARTS_DIR = "cache/audio_parts"
DEFAULT_MERGED_DIR = "cache/audio_merged"
DEFAULT_TEXT_DIR = "cache/text_parts"
DEFAULT_FINAL_DIR = "text"
# ===== Model =====
MODEL_NAME = "whisper-large-v3" # ← ご要望通り
@dataclass
class Args:
input: Path
source_dir: Path
noise: float
duration: float
target_mb: float
safety_mb: float
parts_dir: Path
merged_dir: Path
text_dir: Path
final_text_dir: Optional[Path]
auto_final_dir: bool
language: Optional[str]
prompt: Optional[str]
max_retries: int
retry_wait: float
def run(cmd: List[str], quiet=False):
if not quiet:
print("+", " ".join(shlex.quote(c) for c in cmd), flush=True)
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
sys.stderr.write(proc.stderr[:1500] + ("\n...truncated...\n" if len(proc.stderr) > 1500 else ""))
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
return proc
def ffprobe_duration(path: Path) -> float:
proc = run(["ffprobe","-v","error","-show_entries","format=duration","-of","default=noprint_wrappers=1:nokey=1",str(path)], quiet=True)
return float(proc.stdout.strip())
def has_audio_stream(path: Path) -> bool:
"""
Returns True if the media file contains an audio stream.
"""
proc = run([
"ffprobe","-v","error",
"-select_streams","a:0",
"-show_entries","stream=index",
"-of","csv=p=0",
str(path)
], quiet=True)
return bool(proc.stdout.strip())
def derive_final_dir_from_input(input_path: Path) -> Path:
"""
入力ファイルの親ディレクトリのうち、以下のマッピングに従って最終テキスト出力ディレクトリを導出する。
- `video` -> `transcript`
- `audio` -> `text`
例:
/foo/bar/video/baz.mp4 -> /foo/bar/transcript
/foo/bar/audio/baz.mp3 -> /foo/bar/text
上記いずれの要素も見つからない場合は、デフォルト `text` をカレントからの解決パスとして返す。
"""
parent_dir = input_path.parent
parts = list(parent_dir.parts)
replaced = False
new_parts: List[str] = []
for part in parts:
if part == "video":
new_parts.append("transcript")
replaced = True
elif part == "audio":
new_parts.append("text")
replaced = True
else:
new_parts.append(part)
if replaced and new_parts:
return Path(*new_parts)
# フォールバック: カレント配下の text
return Path(DEFAULT_FINAL_DIR).expanduser().resolve()
def derive_audio_dir_from_input(input_path: Path) -> Optional[Path]:
"""
入力ファイルの親ディレクトリのうち、パス要素 `video` を `audio` に置換した
ディレクトリを導出して返す。`video` が見つからない場合は None を返す。
例: /foo/bar/video/baz.mp4 -> /foo/bar/audio
"""
parent_dir = input_path.parent
parts = list(parent_dir.parts)
replaced = False
new_parts: List[str] = []
for part in parts:
if part == "video":
new_parts.append("audio")
replaced = True
else:
new_parts.append(part)
if replaced and new_parts:
return Path(*new_parts)
return None
def clean_dir(path: Path):
path.mkdir(parents=True, exist_ok=True)
for p in list(path.iterdir()):
if p.is_file():
try:
p.unlink()
except Exception:
pass
def detect_silences(input_path: Path, noise_db: float, min_silence: float, log_path: Path) -> List[Tuple[float, float]]:
cmd = ["ffmpeg","-hide_banner","-i",str(input_path),"-af",f"silencedetect=noise={noise_db}dB:d={min_silence}","-f","null","-"]
print("Detecting silences...", flush=True)
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
log_path.write_text(proc.stderr, encoding="utf-8")
starts, ends = [], []
for line in proc.stderr.splitlines():
if "silence_start:" in line:
m = re.search(r"silence_start:\s*([0-9.]+)", line)
if m: starts.append(float(m.group(1)))
elif "silence_end:" in line:
m = re.search(r"silence_end:\s*([0-9.]+)", line)
if m: ends.append(float(m.group(1)))
pairs = []
i = j = 0
while i < len(starts) or j < len(ends):
if i < len(starts) and (j >= len(ends) or starts[i] < ends[j]):
if j < len(ends):
pairs.append((starts[i], ends[j])); i += 1; j += 1
else:
pairs.append((starts[i], None)); i += 1
else:
pairs.append((None, ends[j])); j += 1
return pairs
def cut_by_silence(input_path: Path, silences: List[Tuple[float, float]], outdir: Path) -> List[Path]:
outdir.mkdir(parents=True, exist_ok=True)
cut_points = sorted(set([end for (_, end) in silences if end is not None]))
parts = []
prev = 0.0
stem = input_path.stem
ext = input_path.suffix.lstrip(".")
for idx, t in enumerate(cut_points, start=1):
out = outdir / f"{stem}_part{idx}.{ext}"
run(["ffmpeg","-hide_banner","-y","-i",str(input_path),"-ss",f"{prev}","-to",f"{t}","-c","copy",str(out)])
parts.append(out); prev = t
idx = len(cut_points) + 1
out = outdir / f"{stem}_part{idx}.{ext}"
run(["ffmpeg","-hide_banner","-y","-i",str(input_path),"-ss",f"{prev}","-c","copy",str(out)])
parts.append(out)
return parts
def ensure_under_limit(path: Path, target_bytes: int) -> List[Path]:
if path.stat().st_size <= target_bytes:
return [path]
print(f"Chunk {path.name} is {path.stat().st_size/1024/1024:.2f} MB > limit; splitting...", flush=True)
duration = ffprobe_duration(path)
half = duration / 2.0
parent = path.parent; stem = path.stem; ext = path.suffix.lstrip(".")
p1 = parent / f"{stem}_a.{ext}"
p2 = parent / f"{stem}_b.{ext}"
run(["ffmpeg","-hide_banner","-y","-i",str(path),"-t",f"{half}","-c","copy",str(p1)])
run(["ffmpeg","-hide_banner","-y","-i",str(path),"-ss",f"{half}","-c","copy",str(p2)])
try: path.unlink()
except Exception: pass
out = []
for p in (p1,p2):
out.extend(ensure_under_limit(p, target_bytes))
return out
def merge_under_limit(parts: List[Path], target_bytes: int, merged_dir: Path, base_stem: str) -> List[Path]:
merged_dir.mkdir(parents=True, exist_ok=True)
normalized = []
for p in parts:
normalized.extend(ensure_under_limit(p, target_bytes))
groups = []
cur, cur_size = [], 0
for p in normalized:
s = p.stat().st_size
if not cur:
cur = [p]; cur_size = s
else:
if cur_size + s <= target_bytes:
cur.append(p); cur_size += s
else:
groups.append(cur); cur = [p]; cur_size = s
if cur: groups.append(cur)
outputs = []
for i, group in enumerate(groups, start=1):
list_path = merged_dir / f"{base_stem}_group{i}.txt"
with open(list_path, "w", encoding="utf-8") as f:
for p in group:
f.write(f"file '{p.as_posix()}'\n")
out_path = merged_dir / f"{base_stem}_merged_{i}{group[0].suffix}"
run(["ffmpeg","-hide_banner","-y","-f","concat","-safe","0","-i",str(list_path),"-c","copy",str(out_path)])
outputs.append(out_path)
return outputs
def has_video_stream(path: Path) -> bool:
"""
Returns True if the media file contains a video stream.
"""
proc = run([
"ffprobe","-v","error",
"-select_streams","v:0",
"-show_entries","stream=index",
"-of","csv=p=0",
str(path)
], quiet=True)
return bool(proc.stdout.strip())
def convert_to_mp3(input_path: Path, output_path: Path, bitrate_kbps: str = "128k") -> Path:
"""
Extracts audio to MP3 using libmp3lame at the given bitrate.
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
run(["ffmpeg","-hide_banner","-y","-i",str(input_path),"-vn","-acodec","libmp3lame","-ab",bitrate_kbps,str(output_path)])
return output_path
def prepare_audio_input(input_path: Path, source_dir: Path) -> Path:
"""
If the input contains video, extract audio to MP3 in `source_dir` and return that path.
Otherwise, return the original input path.
"""
try:
if has_video_stream(input_path):
# 入力パスに 'video' が含まれていれば、'audio' に置換したディレクトリを優先
derived_dir = derive_audio_dir_from_input(input_path)
target_dir = derived_dir if derived_dir is not None else source_dir
out_path = target_dir / f"{input_path.stem}.mp3"
return convert_to_mp3(input_path, out_path, "128k")
except Exception:
# If probing fails, fall back to original input
pass
return input_path
def extract_retry_after_seconds(message: str) -> Optional[float]:
"""
Parse patterns like "Please try again in 8m10.422s" or "Please try again in 12.5s"
and return seconds as float. If not found, return None.
"""
try:
m = re.search(r"Please try again in\s+(?:(\d+)m)?(\d+(?:\.\d+)?)s", message)
if m:
minutes = int(m.group(1)) if m.group(1) else 0
seconds = float(m.group(2))
return minutes * 60 + seconds
except Exception:
pass
return None
def compute_backoff_seconds(error: Exception, default_wait: float) -> float:
"""
For 429/rate limit errors, prefer server-suggested wait time; otherwise fallback.
"""
message = str(error)
if "429" in message or "rate limit" in message.lower() or "rate_limit" in message.lower():
parsed = extract_retry_after_seconds(message)
if parsed is not None:
# Add a small safety margin
return max(5.0, min(parsed + 2.0, 3600.0))
# Sensible default for rate limits when no hint provided
return max(60.0, default_wait)
return max(1.0, default_wait)
def expand_media_inputs(input_path: Path) -> List[Path]:
"""
If `input_path` is a file, return [input_path]. If it is a directory, return
a list of immediate child files that are recognized as audio or video.
(Non-recursive by design.)
"""
if input_path.is_file():
return [input_path]
if input_path.is_dir():
candidates: List[Path] = []
for p in sorted(input_path.iterdir()):
if p.is_file():
try:
if has_video_stream(p) or has_audio_stream(p):
candidates.append(p)
except Exception:
# Skip unreadable/unprobeable files
pass
return candidates
raise FileNotFoundError(f"Input path not found: {input_path}")
def transcribe_groq(paths: List[Path], text_dir: Path, language: Optional[str], prompt: Optional[str], max_retries: int, retry_wait: float) -> List[Path]:
# Lazy import
try:
from groq import Groq
except Exception as e:
raise RuntimeError("groq Python package is required. Install with: pip install groq") from e
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
raise RuntimeError("Environment variable GROQ_API_KEY is not set.")
client = Groq(api_key=api_key)
text_dir.mkdir(parents=True, exist_ok=True)
txt_paths = []
for idx, p in enumerate(paths, start=1):
base = p.stem
txt_out = text_dir / f"{base}.txt"
json_out = text_dir / f"{base}.json"
if txt_out.exists() and json_out.exists():
txt_paths.append(txt_out)
continue
print(f"Transcribing [{idx}/{len(paths)}]: {p.name}")
last_err = None
for attempt in range(1, max_retries+1):
try:
with open(p, "rb") as f:
resp = client.audio.transcriptions.create(
file=(p.name, f.read(), "audio/mpeg"),
model=MODEL_NAME,
response_format="verbose_json",
language=language,
temperature=0,
prompt=prompt
)
# Normalize resp
try:
data = resp.to_dict() if hasattr(resp, "to_dict") else json.loads(resp.model_dump_json())
except Exception:
data = {"text": getattr(resp, "text", "")}
text = data.get("text") or ""
json_out.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
txt_out.write_text(text, encoding="utf-8")
txt_paths.append(txt_out)
break
except Exception as e:
last_err = e
print(f" Attempt {attempt} failed: {e}")
if attempt < max_retries:
backoff = compute_backoff_seconds(e, retry_wait)
try:
print(f" Waiting {backoff:.1f}s before retry...")
except Exception:
pass
time.sleep(backoff)
else:
raise RuntimeError(f"Transcription failed for {p} after {max_retries} attempts: {last_err}")
return txt_paths
def merge_texts(txt_paths: List[Path], output_path: Path, input_filename: str):
"""
パート見出しは付けず、冒頭に元ファイル名を1行、その後空行、以降は各TXTを改行で連結。
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as out:
# 先頭に元ファイル名
out.write(f"{input_filename}\n\n")
# 本文は素で結合(間に1空行)
for i, p in enumerate(txt_paths):
try:
body = p.read_text(encoding="utf-8").strip()
except Exception:
body = ""
if body:
out.write(body)
out.write("\n\n")
return output_path
def build_args() -> Args:
ap = argparse.ArgumentParser(description="Split/Merge audio, transcribe with Groq Whisper, and merge texts.")
ap.add_argument("input", help="Input file or directory path (e.g., audio/xxx.mp3 or a folder of videos)")
ap.add_argument("--source-dir", default=DEFAULT_SOURCE_DIR, help="Working dir for extracted audio from video inputs")
ap.add_argument("--noise", type=float, default=-35.0) # ← デフォルト変更
ap.add_argument("--duration", type=float, default=3.0) # ← デフォルト変更
ap.add_argument("--target-mb", type=float, default=25.0) # ← デフォルト確認
ap.add_argument("--safety-mb", type=float, default=0.5)
ap.add_argument("--parts-dir", default=DEFAULT_PARTS_DIR)
ap.add_argument("--merged-dir", default=DEFAULT_MERGED_DIR)
ap.add_argument("--text-dir", default=DEFAULT_TEXT_DIR)
ap.add_argument("--final-text-dir", default=DEFAULT_FINAL_DIR) # 最終はこの直下に <stem>.txt を作る
ap.add_argument("--language", default=None)
ap.add_argument("--prompt", default=None)
ap.add_argument("--max-retries", type=int, default=3)
ap.add_argument("--retry-wait", type=float, default=3.0)
a = ap.parse_args()
input_path = Path(a.input).expanduser().resolve()
# ユーザーが --final-text-dir を指定していない場合(デフォルト 'text' のまま)は、
# per-file で動的に導出する(各入力ファイルのパス上 'audio'→'text' に置換)。
if a.final_text_dir == DEFAULT_FINAL_DIR:
final_dir = None
auto_final = True
else:
final_dir = Path(a.final_text_dir).expanduser().resolve()
auto_final = False
return Args(
input=input_path,
source_dir=Path(a.source_dir).expanduser().resolve(),
noise=a.noise,
duration=a.duration,
target_mb=a.target_mb,
safety_mb=a.safety_mb,
parts_dir=Path(a.parts_dir).expanduser().resolve(),
merged_dir=Path(a.merged_dir).expanduser().resolve(),
text_dir=Path(a.text_dir).expanduser().resolve(),
final_text_dir=final_dir,
auto_final_dir=auto_final,
language=a.language,
prompt=a.prompt,
max_retries=a.max_retries,
retry_wait=a.retry_wait
)
def main():
args = build_args()
# Clean caches
for d in [args.source_dir, args.parts_dir, args.merged_dir, args.text_dir]:
clean_dir(d)
inputs: List[Path] = expand_media_inputs(args.input)
final_outputs: List[Path] = []
for original_input in inputs:
# 既に最終出力(transcript/text)が存在する場合はスキップ
if args.auto_final_dir:
per_file_final_dir = derive_final_dir_from_input(original_input)
else:
per_file_final_dir = args.final_text_dir if args.final_text_dir else Path(DEFAULT_FINAL_DIR).expanduser().resolve()
precheck_final_text_path = per_file_final_dir / f"{original_input.stem}.txt"
if precheck_final_text_path.exists():
print(f"Skipping (already exists): {precheck_final_text_path}")
final_outputs.append(precheck_final_text_path)
continue
# If input is a video, extract audio to MP3 first
work_input = prepare_audio_input(original_input, args.source_dir)
# Split & Merge on prepared audio
log_path = args.parts_dir / f"silence_{work_input.stem}.log"
silences = detect_silences(work_input, args.noise, args.duration, log_path)
parts = cut_by_silence(work_input, silences, args.parts_dir)
target_bytes = int((args.target_mb - args.safety_mb) * 1024 * 1024)
merged_paths = merge_under_limit(parts, target_bytes, args.merged_dir, work_input.stem)
# Transcribe each merged audio
txt_paths = transcribe_groq(merged_paths, args.text_dir, args.language, args.prompt, args.max_retries, args.retry_wait)
# Final merged text file path per original input
if args.auto_final_dir:
per_file_final_dir = derive_final_dir_from_input(original_input)
else:
per_file_final_dir = args.final_text_dir if args.final_text_dir else Path(DEFAULT_FINAL_DIR).expanduser().resolve()
final_text_path = per_file_final_dir / f"{original_input.stem}.txt"
merge_texts(txt_paths, final_text_path, input_filename=original_input.as_posix())
final_outputs.append(final_text_path)
print("\n=== DONE (file) ===")
if work_input != original_input:
print("Source audio :", work_input)
print("Audio parts dir :", args.parts_dir)
print("Audio merged dir :", args.merged_dir)
print("Text parts dir :", args.text_dir)
print("Final transcript :", final_text_path)
if len(final_outputs) > 1:
print("\n=== ALL FILES DONE ===")
for p in final_outputs:
print("-", p)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment