Skip to content

Instantly share code, notes, and snippets.

@abdusco
Last active August 13, 2025 15:42
Show Gist options
  • Save abdusco/5bd5c909547f5f9b935dbd2fb2fe12a7 to your computer and use it in GitHub Desktop.
Save abdusco/5bd5c909547f5f9b935dbd2fb2fe12a7 to your computer and use it in GitHub Desktop.
Subtitle translator
#!/usr/bin/env -S uv run --script
# /// script
# dependencies = ["srt", "httpx"]
# ///
import argparse
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import os
from pathlib import Path
import time
from typing import Iterator
import srt
import httpx
logger = logging.getLogger(__name__)
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise RuntimeError("OPENROUTER_API_KEY environment variable not set")
client = httpx.Client(
headers={"Authorization": f"Bearer {api_key}"},
base_url="https://openrouter.ai/api/v1",
timeout=30,
)
pool = ThreadPoolExecutor(max_workers=5)
# retry is a decorator to retry the function on failure
def retry(max_retries: int = 3, delay: float = 1.0):
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning(f"Error: {e}, retrying {attempt + 1}/{max_retries}...")
if attempt < max_retries - 1:
time.sleep(delay)
raise RuntimeError("Max retries exceeded")
return wrapper
return decorator
def combine_entries(subtitles: list[srt.Subtitle], max_entries: int = 2) -> list[srt.Subtitle]:
"""
Merge adjacent subtitles in groups of up to 'max_entries' if:
- The gap between the end of one and the start of the next is <= 1 second
- The total word count of the group is fewer than 8
"""
if not subtitles:
return []
merged_subtitles = []
current_index = 1
i = 0
total_subs = len(subtitles)
while i < total_subs:
group = [subtitles[i]]
group_size = 1
while group_size < max_entries and (i + group_size) < total_subs:
last_sub = group[-1]
next_sub = subtitles[i + group_size]
time_gap = (next_sub.start - last_sub.end).total_seconds()
group_word_count = sum(len(sub.content.split()) for sub in group) + len(next_sub.content.split())
if time_gap <= 1 and group_word_count < 18:
group.append(next_sub)
group_size += 1
else:
break
start_time = group[0].start
end_time = group[-1].end
combined_text = " ".join(sub.content.replace("\n", " ") for sub in group)
words = combined_text.split()
formatted_text = format_subtitle_text(words)
merged_subtitles.append(srt.Subtitle(index=current_index, start=start_time, end=end_time, content=formatted_text, proprietary=""))
current_index += 1
i += len(group)
return merged_subtitles
def format_subtitle_text(words: list[str], max_words_per_line: int = 8) -> str:
"""
Format a list of words into subtitle text with balanced line breaks.
Try to keep lines with the same amount of words, but put at most max_words_per_line words per line.
If there's 1 word left for the last line, join it with the previous line.
"""
if not words:
return ""
n = len(words)
if n <= max_words_per_line:
return " ".join(words)
num_lines = (n + max_words_per_line - 1) // max_words_per_line
# Try to distribute words as evenly as possible
base = n // num_lines
extra = n % num_lines
lines = []
idx = 0
for i in range(num_lines):
# Distribute the remainder words to the first 'extra' lines
count = base + (1 if i < extra else 0)
# If this is the last line and only 1 word left, join it with previous
if i == num_lines - 1 and count == 1 and lines:
lines[-1] += " " + words[idx]
break
lines.append(" ".join(words[idx : idx + count]))
idx += count
return "\n".join(lines)
def clean_entries(subs: list[srt.Subtitle]) -> list[srt.Subtitle]:
"""
Remove all occurrences of '\\h' from subtitle content.
"""
for sub in subs:
content = sub.content.replace(r"\h", " ")
content = content.replace(r"\n", "\n")
content = "\n".join(line.strip() for line in content.splitlines() if line.strip())
sub.content = content
return subs
def remove_overlaps(subs: list[srt.Subtitle]) -> list[srt.Subtitle]:
"""
Remove duplicate lines between consecutive subtitle entries.
If the beginning of a subtitle entry matches the end of the previous entry,
those duplicate lines are removed from the current subtitle.
"""
if not subs:
return []
cleaned = []
prev_lines = []
for sub in subs:
# Split content into non-empty lines
lines = [line.strip() for line in sub.content.splitlines()]
lines = [line for line in lines if line]
if not lines:
continue
# Check for overlap with previous subtitle
overlap = 0
max_overlap = min(len(prev_lines), len(lines))
for i in range(max_overlap, 0, -1):
if prev_lines[-i:] == lines[:i]:
overlap = i
break
# Keep only non-overlapping lines
non_dup_lines = lines[overlap:]
if non_dup_lines:
# Create new subtitle with cleaned content
new_sub = srt.Subtitle(index=sub.index, start=sub.start, end=sub.end, content="\n".join(non_dup_lines), proprietary=sub.proprietary)
cleaned.append(new_sub)
prev_lines = lines
# Reindex subtitles
for i, sub in enumerate(cleaned, 1):
sub.index = i
return cleaned
def chunkify[T](it: list[T], size: int) -> Iterator[list[T]]:
"""Yield successive n-sized chunks from it."""
for i in range(0, len(it), size):
yield it[i : i + size]
TOOL_SCHEMA_SAVE_SUBTITLES = {
"type": "function",
"function": {
"name": "save_subtitles",
"description": "Save translated subtitle entries",
"parameters": {
"type": "object",
"properties": {
"entries": {
"type": "array",
"items": {
"type": "object",
"properties": {"id": {"type": "integer"}, "translated": {"type": "string"}},
"required": ["id", "translated"],
},
}
},
"required": ["entries"],
},
},
}
@retry(max_retries=5)
def _translate_chunk_with_context(
chunk: list[srt.Subtitle],
context_before: list[srt.Subtitle],
context_after: list[srt.Subtitle],
lang: str,
model: str,
) -> list[srt.Subtitle]:
"""
Translates a single chunk of subtitles using preceding and succeeding subtitles as context.
"""
prompt = f"""
You are an expert subtitle translator.
You will be provided with a JSON object containing three lists: 'context_before', 'main_chunk', and 'context_after'.
Your task is to translate ONLY the subtitles in the 'main_chunk' from English to {lang}.
Use the 'context_before' and 'context_after' lists to understand the full conversational flow, resolve ambiguities, and ensure tonal consistency.
DO NOT translate the content of the 'context_before' or 'context_after' lists in your final output.
- Translate idiomatically, as a native speaker would.
- If the content is too verbose, rewrite it to be more concise while preserving the original meaning.
- Do not censor profanity or slang; keep the original tone and style.
- Add punctuation and capitalization as needed.
- Do NOT translate proper nouns, names, or technical terms; keep them in English.
- Do NOT add, remove, or merge entries.
Return ONLY the translated entries for the 'main_chunk'. The number of entries in your response must exactly match the number of entries in the input 'main_chunk'.
For each input entry in 'main_chunk', there must be one output entry with the same 'id'.
Use your save_subtitles tool to return the results.
"""
# Create the structured payload for the API
payload_data = {
"context_before": [{"id": s.index, "content": s.content.strip()} for s in context_before],
"main_chunk": [{"id": s.index, "content": s.content.strip()} for s in chunk],
"context_after": [{"id": s.index, "content": s.content.strip()} for s in context_after],
}
payload = {
"model": model,
"messages": [
{"role": "system", "content": prompt},
{"role": "user", "content": json.dumps(payload_data, ensure_ascii=False)},
],
"tools": [TOOL_SCHEMA_SAVE_SUBTITLES],
"tool_choice": {"type": "function", "function": {"name": TOOL_SCHEMA_SAVE_SUBTITLES["function"]["name"]}},
}
res = client.post(url="/chat/completions", json=payload)
if res.is_error:
logger.error(f"Error translating subtitles: {res.text}")
res.raise_for_status()
data = res.json()
tool_calls = data["choices"][0]["message"]["tool_calls"]
for call in tool_calls:
if call["function"]["name"] != TOOL_SCHEMA_SAVE_SUBTITLES["function"]["name"]:
continue
parsed = json.loads(call["function"]["arguments"])
translated_entries = parsed["entries"]
if len(translated_entries) != len(chunk):
raise ValueError(f"Translation returned {len(translated_entries)} entries, but the chunk had {len(chunk)}.")
# Reassemble subtitles with translated content
id_to_sub = {s.index: s for s in chunk}
result = []
for entry in translated_entries:
original_sub = id_to_sub.get(entry["id"])
if original_sub:
original_sub.content = entry["translated"].strip()
result.append(original_sub)
return result
raise ValueError("The model did not return any translated subtitles via the specified tool.")
def translate_entries(
model: str,
entries: list[srt.Subtitle],
lang: str,
chunk_size: int = 25,
context_size: int = 2,
) -> list[srt.Subtitle]:
"""
Translates a list of subtitle entries using a sliding window for context.
"""
logger.info(f"Translating {len(entries)} entries to {lang}")
if not entries:
return []
chunks = list(chunkify(entries, chunk_size))
futures = []
for i, chunk in enumerate(chunks):
logger.info(f"Submitting chunk {i + 1}/{len(chunks)} for translation...")
# Get context from the previous chunk
context_before = []
if i > 0:
prev_chunk = chunks[i - 1]
context_before = prev_chunk[-context_size:]
# Get context from the next chunk
context_after = []
if i < len(chunks) - 1:
next_chunk = chunks[i + 1]
context_after = next_chunk[:context_size]
# Submit the translation task to the thread pool
future = pool.submit(
_translate_chunk_with_context,
chunk=chunk,
context_before=context_before,
context_after=context_after,
lang=lang,
model=model,
)
futures.append(future)
# Collect results as they complete
translated_subs = []
for i, future in enumerate(futures):
logger.info(f"Processing result of chunk {i + 1}/{len(chunks)}...")
try:
translated_chunk = future.result()
translated_subs.extend(translated_chunk)
except Exception as e:
logger.error(f"Error translating chunk {i + 1}: {e}")
# Optionally, decide how to handle failed chunks, e.g., skip or retry.
# For now, we'll just log the error and continue.
# Sort subtitles by start time to ensure correct order
translated_subs.sort(key=lambda s: s.start)
# Re-index all subtitles to ensure they are sequential
for i, sub in enumerate(translated_subs, 1):
sub.index = i
logger.info("Translation complete.")
return translated_subs
def translate(subtitle_path: Path, lang: str, save_path: Path, model: str, condense: int | None = None) -> None:
"""
Translate the subtitles in the given SRT file to the specified language.
Returns a new SRT file with translated subtitles.
"""
original = list(srt.parse(subtitle_path.read_text()))
cleaned = clean_entries(original)
without_overlaps = remove_overlaps(cleaned)
entries = without_overlaps
if condense:
condensed = combine_entries(entries, max_entries=condense)
save_path.with_name(f"{save_path.stem}.condensed.srt").write_text(srt.compose(condensed))
logger.info(f"Condensed subtitles from {len(entries)} to {len(condensed)} entries")
entries = condensed
if not entries:
raise ValueError("No valid subtitle entries found to translate")
translated = translate_entries(entries, lang=lang, model=model)
# wrap the translated version as well
if condense:
translated = combine_entries(translated, max_entries=condense)
save_path.write_text(srt.compose(translated))
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="""
Translate subtitles from SRT file using OpenRouter API.
This script reads an SRT file, translates the subtitles to the specified language,
and saves the translated subtitles to a new SRT file.
""",
)
p.add_argument("input", type=Path, help="Input SRT file")
p.add_argument("output", type=Path, help="Output SRT file")
p.add_argument("--translate", type=str, help="Language code for translation")
p.add_argument(
"--model",
type=str,
default="google/gemini-2.5-flash",
help="Model name for translation",
)
p.add_argument(
"--condense",
type=int,
help="Merge adjacent short subtitles in groups of N if close together",
)
return p.parse_args()
def main():
logging.basicConfig(level=logging.INFO, format="%(name)s: %(asctime)s %(levelname)s: %(message)s")
logging.getLogger("httpx").setLevel(logging.WARNING) # Suppress httpx debug logs
args = parse_args()
translate(
subtitle_path=args.input,
lang=args.translate,
save_path=args.output,
model=args.model,
condense=args.condense,
)
if __name__ == "__main__":
main()
@abdusco
Copy link
Author

abdusco commented Aug 13, 2025

Prerequisites

Basic Usage

Translate an SRT file to another language:

./subtitle_translator.py input.srt output.srt --translate de

Options

  • --model MODEL
    Specify translation model (default: google/gemini-2.5-flash).

  • --condense N
    Merge adjacent short subtitles in groups of N if close together.

Example

./subtitle_translator.py movie.srt movie.de.srt --translate de --condense 2

This translates movie.srt to German, condensing adjacent short subtitles in pairs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment