Skip to content

Instantly share code, notes, and snippets.

@EvilFreelancer
Created January 25, 2025 11:49
Show Gist options
  • Save EvilFreelancer/c60500688b5c5196f4de0b86d2216402 to your computer and use it in GitHub Desktop.
Save EvilFreelancer/c60500688b5c5196f4de0b86d2216402 to your computer and use it in GitHub Desktop.
Небольшой скрипт для транскрипции аудио через whisper.cpp с чанкингом по 60 секунд
import os
import math
import srt
import fire
import requests
import subprocess
from tqdm import tqdm
from datetime import timedelta
def get_audio_length(input_file):
"""Return total length (in seconds) using ffprobe."""
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
input_file
]
output = subprocess.check_output(cmd)
return math.ceil(float(output.strip()))
def extract_chunk(input_file, start_sec, duration, temp_file):
"""Use ffmpeg to extract a chunk without re-encoding."""
cmd = [
"ffmpeg", "-y",
"-ss", str(start_sec),
"-t", str(duration),
"-i", input_file,
"-c", "copy",
temp_file
]
subprocess.run(cmd, check=True, capture_output=True)
def process_chunk_ffmpeg(input_file, start_sec, end_sec, server_url, language):
"""Extract chunk via ffmpeg, send to Whisper, parse SRT."""
basename = os.path.splitext(input_file)[0]
ext = os.path.splitext(input_file)[1].lstrip('.')
temp_name = f"{basename}_{start_sec}_{end_sec}.{ext}"
duration = end_sec - start_sec
extract_chunk(input_file, start_sec, duration, temp_name)
with open(temp_name, 'rb') as f:
files = {'file': f}
data = {
'language': language,
'response_format': 'srt'
}
resp = requests.post(f"{server_url}/inference", files=files, data=data)
os.remove(temp_name)
if resp.status_code == 200:
try:
subs = list(srt.parse(resp.text))
# Применяем смещение времени к каждому субтитру
offset = timedelta(seconds=start_sec)
adjusted_subs = []
for sub in subs:
new_start = sub.start + offset
new_end = sub.end + offset
adjusted_sub = srt.Subtitle(
index=sub.index,
start=new_start,
end=new_end,
content=sub.content,
proprietary=sub.proprietary
)
adjusted_subs.append(adjusted_sub)
return adjusted_subs
except Exception as e:
print(f"Error parsing SRT: {e}")
return []
else:
print(f"Error {resp.status_code}: {resp.text}")
return []
def merge_subs(subs_list):
"""Merge and sort multiple subtitle lists, return SRT string."""
merged = []
for subs in subs_list:
merged.extend(subs)
merged.sort(key=lambda x: x.start)
return srt.compose(merged)
def transcribe_long_audio(
input_file,
server_url='http://gpu02:9000',
chunk_sec=60,
out_srt=None,
language='ru'
):
"""Split audio (via ffmpeg) into chunk_sec pieces, send each to Whisper, merge SRT."""
if out_srt is None:
out_srt = os.path.splitext(input_file)[0] + '.srt'
total_sec = get_audio_length(input_file)
segments = []
start = 0
while start < total_sec:
end = min(start + chunk_sec, total_sec)
segments.append((start, end))
start = end
all_subs = []
for (start_sec, end_sec) in tqdm(segments, desc="Processing chunks"):
subs = process_chunk_ffmpeg(input_file, start_sec, end_sec, server_url, language)
all_subs.append(subs)
final_srt = merge_subs(all_subs)
with open(out_srt, 'w', encoding='utf-8') as f:
f.write(final_srt)
print(f"Done! Subtitles saved to {out_srt}")
if __name__ == '__main__':
fire.Fire(transcribe_long_audio)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment