Last active
August 13, 2025 15:42
-
-
Save abdusco/5bd5c909547f5f9b935dbd2fb2fe12a7 to your computer and use it in GitHub Desktop.
Subtitle translator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run --script | |
# /// script | |
# dependencies = ["srt", "httpx"] | |
# /// | |
import argparse | |
from concurrent.futures import ThreadPoolExecutor | |
import json | |
import logging | |
import os | |
from pathlib import Path | |
import time | |
from typing import Iterator | |
import srt | |
import httpx | |
logger = logging.getLogger(__name__) | |
api_key = os.getenv("OPENROUTER_API_KEY") | |
if not api_key: | |
raise RuntimeError("OPENROUTER_API_KEY environment variable not set") | |
client = httpx.Client( | |
headers={"Authorization": f"Bearer {api_key}"}, | |
base_url="https://openrouter.ai/api/v1", | |
timeout=30, | |
) | |
pool = ThreadPoolExecutor(max_workers=5) | |
# retry is a decorator to retry the function on failure | |
def retry(max_retries: int = 3, delay: float = 1.0): | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
for attempt in range(max_retries): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
logger.warning(f"Error: {e}, retrying {attempt + 1}/{max_retries}...") | |
if attempt < max_retries - 1: | |
time.sleep(delay) | |
raise RuntimeError("Max retries exceeded") | |
return wrapper | |
return decorator | |
def combine_entries(subtitles: list[srt.Subtitle], max_entries: int = 2) -> list[srt.Subtitle]: | |
""" | |
Merge adjacent subtitles in groups of up to 'max_entries' if: | |
- The gap between the end of one and the start of the next is <= 1 second | |
- The total word count of the group is fewer than 8 | |
""" | |
if not subtitles: | |
return [] | |
merged_subtitles = [] | |
current_index = 1 | |
i = 0 | |
total_subs = len(subtitles) | |
while i < total_subs: | |
group = [subtitles[i]] | |
group_size = 1 | |
while group_size < max_entries and (i + group_size) < total_subs: | |
last_sub = group[-1] | |
next_sub = subtitles[i + group_size] | |
time_gap = (next_sub.start - last_sub.end).total_seconds() | |
group_word_count = sum(len(sub.content.split()) for sub in group) + len(next_sub.content.split()) | |
if time_gap <= 1 and group_word_count < 18: | |
group.append(next_sub) | |
group_size += 1 | |
else: | |
break | |
start_time = group[0].start | |
end_time = group[-1].end | |
combined_text = " ".join(sub.content.replace("\n", " ") for sub in group) | |
words = combined_text.split() | |
formatted_text = format_subtitle_text(words) | |
merged_subtitles.append(srt.Subtitle(index=current_index, start=start_time, end=end_time, content=formatted_text, proprietary="")) | |
current_index += 1 | |
i += len(group) | |
return merged_subtitles | |
def format_subtitle_text(words: list[str], max_words_per_line: int = 8) -> str: | |
""" | |
Format a list of words into subtitle text with balanced line breaks. | |
Try to keep lines with the same amount of words, but put at most max_words_per_line words per line. | |
If there's 1 word left for the last line, join it with the previous line. | |
""" | |
if not words: | |
return "" | |
n = len(words) | |
if n <= max_words_per_line: | |
return " ".join(words) | |
num_lines = (n + max_words_per_line - 1) // max_words_per_line | |
# Try to distribute words as evenly as possible | |
base = n // num_lines | |
extra = n % num_lines | |
lines = [] | |
idx = 0 | |
for i in range(num_lines): | |
# Distribute the remainder words to the first 'extra' lines | |
count = base + (1 if i < extra else 0) | |
# If this is the last line and only 1 word left, join it with previous | |
if i == num_lines - 1 and count == 1 and lines: | |
lines[-1] += " " + words[idx] | |
break | |
lines.append(" ".join(words[idx : idx + count])) | |
idx += count | |
return "\n".join(lines) | |
def clean_entries(subs: list[srt.Subtitle]) -> list[srt.Subtitle]: | |
""" | |
Remove all occurrences of '\\h' from subtitle content. | |
""" | |
for sub in subs: | |
content = sub.content.replace(r"\h", " ") | |
content = content.replace(r"\n", "\n") | |
content = "\n".join(line.strip() for line in content.splitlines() if line.strip()) | |
sub.content = content | |
return subs | |
def remove_overlaps(subs: list[srt.Subtitle]) -> list[srt.Subtitle]: | |
""" | |
Remove duplicate lines between consecutive subtitle entries. | |
If the beginning of a subtitle entry matches the end of the previous entry, | |
those duplicate lines are removed from the current subtitle. | |
""" | |
if not subs: | |
return [] | |
cleaned = [] | |
prev_lines = [] | |
for sub in subs: | |
# Split content into non-empty lines | |
lines = [line.strip() for line in sub.content.splitlines()] | |
lines = [line for line in lines if line] | |
if not lines: | |
continue | |
# Check for overlap with previous subtitle | |
overlap = 0 | |
max_overlap = min(len(prev_lines), len(lines)) | |
for i in range(max_overlap, 0, -1): | |
if prev_lines[-i:] == lines[:i]: | |
overlap = i | |
break | |
# Keep only non-overlapping lines | |
non_dup_lines = lines[overlap:] | |
if non_dup_lines: | |
# Create new subtitle with cleaned content | |
new_sub = srt.Subtitle(index=sub.index, start=sub.start, end=sub.end, content="\n".join(non_dup_lines), proprietary=sub.proprietary) | |
cleaned.append(new_sub) | |
prev_lines = lines | |
# Reindex subtitles | |
for i, sub in enumerate(cleaned, 1): | |
sub.index = i | |
return cleaned | |
def chunkify[T](it: list[T], size: int) -> Iterator[list[T]]: | |
"""Yield successive n-sized chunks from it.""" | |
for i in range(0, len(it), size): | |
yield it[i : i + size] | |
TOOL_SCHEMA_SAVE_SUBTITLES = { | |
"type": "function", | |
"function": { | |
"name": "save_subtitles", | |
"description": "Save translated subtitle entries", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"entries": { | |
"type": "array", | |
"items": { | |
"type": "object", | |
"properties": {"id": {"type": "integer"}, "translated": {"type": "string"}}, | |
"required": ["id", "translated"], | |
}, | |
} | |
}, | |
"required": ["entries"], | |
}, | |
}, | |
} | |
@retry(max_retries=5) | |
def _translate_chunk_with_context( | |
chunk: list[srt.Subtitle], | |
context_before: list[srt.Subtitle], | |
context_after: list[srt.Subtitle], | |
lang: str, | |
model: str, | |
) -> list[srt.Subtitle]: | |
""" | |
Translates a single chunk of subtitles using preceding and succeeding subtitles as context. | |
""" | |
prompt = f""" | |
You are an expert subtitle translator. | |
You will be provided with a JSON object containing three lists: 'context_before', 'main_chunk', and 'context_after'. | |
Your task is to translate ONLY the subtitles in the 'main_chunk' from English to {lang}. | |
Use the 'context_before' and 'context_after' lists to understand the full conversational flow, resolve ambiguities, and ensure tonal consistency. | |
DO NOT translate the content of the 'context_before' or 'context_after' lists in your final output. | |
- Translate idiomatically, as a native speaker would. | |
- If the content is too verbose, rewrite it to be more concise while preserving the original meaning. | |
- Do not censor profanity or slang; keep the original tone and style. | |
- Add punctuation and capitalization as needed. | |
- Do NOT translate proper nouns, names, or technical terms; keep them in English. | |
- Do NOT add, remove, or merge entries. | |
Return ONLY the translated entries for the 'main_chunk'. The number of entries in your response must exactly match the number of entries in the input 'main_chunk'. | |
For each input entry in 'main_chunk', there must be one output entry with the same 'id'. | |
Use your save_subtitles tool to return the results. | |
""" | |
# Create the structured payload for the API | |
payload_data = { | |
"context_before": [{"id": s.index, "content": s.content.strip()} for s in context_before], | |
"main_chunk": [{"id": s.index, "content": s.content.strip()} for s in chunk], | |
"context_after": [{"id": s.index, "content": s.content.strip()} for s in context_after], | |
} | |
payload = { | |
"model": model, | |
"messages": [ | |
{"role": "system", "content": prompt}, | |
{"role": "user", "content": json.dumps(payload_data, ensure_ascii=False)}, | |
], | |
"tools": [TOOL_SCHEMA_SAVE_SUBTITLES], | |
"tool_choice": {"type": "function", "function": {"name": TOOL_SCHEMA_SAVE_SUBTITLES["function"]["name"]}}, | |
} | |
res = client.post(url="/chat/completions", json=payload) | |
if res.is_error: | |
logger.error(f"Error translating subtitles: {res.text}") | |
res.raise_for_status() | |
data = res.json() | |
tool_calls = data["choices"][0]["message"]["tool_calls"] | |
for call in tool_calls: | |
if call["function"]["name"] != TOOL_SCHEMA_SAVE_SUBTITLES["function"]["name"]: | |
continue | |
parsed = json.loads(call["function"]["arguments"]) | |
translated_entries = parsed["entries"] | |
if len(translated_entries) != len(chunk): | |
raise ValueError(f"Translation returned {len(translated_entries)} entries, but the chunk had {len(chunk)}.") | |
# Reassemble subtitles with translated content | |
id_to_sub = {s.index: s for s in chunk} | |
result = [] | |
for entry in translated_entries: | |
original_sub = id_to_sub.get(entry["id"]) | |
if original_sub: | |
original_sub.content = entry["translated"].strip() | |
result.append(original_sub) | |
return result | |
raise ValueError("The model did not return any translated subtitles via the specified tool.") | |
def translate_entries( | |
model: str, | |
entries: list[srt.Subtitle], | |
lang: str, | |
chunk_size: int = 25, | |
context_size: int = 2, | |
) -> list[srt.Subtitle]: | |
""" | |
Translates a list of subtitle entries using a sliding window for context. | |
""" | |
logger.info(f"Translating {len(entries)} entries to {lang}") | |
if not entries: | |
return [] | |
chunks = list(chunkify(entries, chunk_size)) | |
futures = [] | |
for i, chunk in enumerate(chunks): | |
logger.info(f"Submitting chunk {i + 1}/{len(chunks)} for translation...") | |
# Get context from the previous chunk | |
context_before = [] | |
if i > 0: | |
prev_chunk = chunks[i - 1] | |
context_before = prev_chunk[-context_size:] | |
# Get context from the next chunk | |
context_after = [] | |
if i < len(chunks) - 1: | |
next_chunk = chunks[i + 1] | |
context_after = next_chunk[:context_size] | |
# Submit the translation task to the thread pool | |
future = pool.submit( | |
_translate_chunk_with_context, | |
chunk=chunk, | |
context_before=context_before, | |
context_after=context_after, | |
lang=lang, | |
model=model, | |
) | |
futures.append(future) | |
# Collect results as they complete | |
translated_subs = [] | |
for i, future in enumerate(futures): | |
logger.info(f"Processing result of chunk {i + 1}/{len(chunks)}...") | |
try: | |
translated_chunk = future.result() | |
translated_subs.extend(translated_chunk) | |
except Exception as e: | |
logger.error(f"Error translating chunk {i + 1}: {e}") | |
# Optionally, decide how to handle failed chunks, e.g., skip or retry. | |
# For now, we'll just log the error and continue. | |
# Sort subtitles by start time to ensure correct order | |
translated_subs.sort(key=lambda s: s.start) | |
# Re-index all subtitles to ensure they are sequential | |
for i, sub in enumerate(translated_subs, 1): | |
sub.index = i | |
logger.info("Translation complete.") | |
return translated_subs | |
def translate(subtitle_path: Path, lang: str, save_path: Path, model: str, condense: int | None = None) -> None: | |
""" | |
Translate the subtitles in the given SRT file to the specified language. | |
Returns a new SRT file with translated subtitles. | |
""" | |
original = list(srt.parse(subtitle_path.read_text())) | |
cleaned = clean_entries(original) | |
without_overlaps = remove_overlaps(cleaned) | |
entries = without_overlaps | |
if condense: | |
condensed = combine_entries(entries, max_entries=condense) | |
save_path.with_name(f"{save_path.stem}.condensed.srt").write_text(srt.compose(condensed)) | |
logger.info(f"Condensed subtitles from {len(entries)} to {len(condensed)} entries") | |
entries = condensed | |
if not entries: | |
raise ValueError("No valid subtitle entries found to translate") | |
translated = translate_entries(entries, lang=lang, model=model) | |
# wrap the translated version as well | |
if condense: | |
translated = combine_entries(translated, max_entries=condense) | |
save_path.write_text(srt.compose(translated)) | |
def parse_args() -> argparse.Namespace: | |
p = argparse.ArgumentParser( | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
description=""" | |
Translate subtitles from SRT file using OpenRouter API. | |
This script reads an SRT file, translates the subtitles to the specified language, | |
and saves the translated subtitles to a new SRT file. | |
""", | |
) | |
p.add_argument("input", type=Path, help="Input SRT file") | |
p.add_argument("output", type=Path, help="Output SRT file") | |
p.add_argument("--translate", type=str, help="Language code for translation") | |
p.add_argument( | |
"--model", | |
type=str, | |
default="google/gemini-2.5-flash", | |
help="Model name for translation", | |
) | |
p.add_argument( | |
"--condense", | |
type=int, | |
help="Merge adjacent short subtitles in groups of N if close together", | |
) | |
return p.parse_args() | |
def main(): | |
logging.basicConfig(level=logging.INFO, format="%(name)s: %(asctime)s %(levelname)s: %(message)s") | |
logging.getLogger("httpx").setLevel(logging.WARNING) # Suppress httpx debug logs | |
args = parse_args() | |
translate( | |
subtitle_path=args.input, | |
lang=args.translate, | |
save_path=args.output, | |
model=args.model, | |
condense=args.condense, | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Prerequisites
export OPENROUTER_API_KEY=your_api_key
Basic Usage
Translate an SRT file to another language:
Options
--model MODEL
Specify translation model (default:
google/gemini-2.5-flash
).--condense N
Merge adjacent short subtitles in groups of N if close together.
Example
This translates
movie.srt
to German, condensing adjacent short subtitles in pairs.