Skip to content

Instantly share code, notes, and snippets.

@code-yeongyu
Created April 17, 2025 09:26
Show Gist options
  • Select an option

  • Save code-yeongyu/97532488723a1aa4df66519f891dd738 to your computer and use it in GitHub Desktop.

Select an option

Save code-yeongyu/97532488723a1aa4df66519f891dd738 to your computer and use it in GitHub Desktop.
import asyncio
import os
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
import ffprobe3
import srt
from ffmpeg.asyncio import FFmpeg
ffmpeg_semaphore = asyncio.Semaphore(os.cpu_count() or 1)
@dataclass
class AudioChunk:
start_time: float
end_time: float
file_path: Path
@property
def duration(self) -> float:
return self.end_time - self.start_time
async def split_audio(
input_path: Path,
chunk_size_mb: int = 20,
output_dir: Path | None = None,
) -> list[AudioChunk]:
"""Split audio file into chunks of specified size."""
output_dir = output_dir or input_path.parent / "chunks"
output_dir.mkdir(exist_ok=True)
# Get audio duration using ffprobe
probe = ffprobe3.probe(str(input_path))
duration = float(probe.format.duration_secs) # type: ignore
# Calculate number of chunks based on file size and target chunk size
file_size = input_path.stat().st_size
num_chunks = (file_size // (chunk_size_mb * 1024 * 1024)) + 1
chunk_duration = duration / num_chunks
chunks: list[AudioChunk] = []
# Split audio into chunks
async def process_chunk(i: int) -> AudioChunk:
start_time = i * chunk_duration
end_time = min((i + 1) * chunk_duration, duration)
chunk_path = output_dir / f"chunk_{i}{input_path.suffix}"
async with ffmpeg_semaphore:
# Use FFmpeg to extract chunk
ffmpeg = (
FFmpeg()
.option("y") # Overwrite output files without asking
.input(str(input_path), ss=start_time, t=end_time - start_time)
.output(str(chunk_path), c="copy") # Use copy codec for faster processing
)
await ffmpeg.execute()
return AudioChunk(
start_time=start_time,
end_time=end_time,
file_path=chunk_path,
)
# Process all chunks concurrently
chunks = await asyncio.gather(*[process_chunk(i) for i in range(num_chunks)])
return chunks
def merge_srt_files(chunks: list[tuple[AudioChunk, str]]) -> str:
"""Merge multiple SRT files with adjusted timestamps."""
all_subtitles: list[srt.Subtitle] = []
current_index = 1
for chunk, srt_content_original in chunks:
srt_content = clean_result(srt_content_original)
chunk_subs = list(srt.parse(srt_content))
# Adjust timestamps for each subtitle in chunk
for sub in chunk_subs:
sub.start += timedelta(seconds=chunk.start_time)
sub.end += timedelta(seconds=chunk.start_time)
sub.index = current_index
current_index += 1
all_subtitles.append(sub)
# Sort subtitles by start time
all_subtitles.sort(key=lambda x: x.start)
# Compose final SRT content
return srt.compose(all_subtitles)
def clean_result(result: str) -> str:
return result.strip().strip('"').replace("\\n", "\n")
async def convert_to_mp3(input_path: Path, output_dir: Path | None = None) -> Path:
"""Convert video/audio file to MP3 format."""
output_dir = output_dir or input_path.parent
output_path = output_dir / f"{input_path.stem}.mp3"
# Check if running on macOS
ffmpeg_options = {
"url": str(output_path),
"acodec": "libmp3lame",
"ab": "192k",
"vn": None,
}
ffmpeg = FFmpeg().option("y").input(str(input_path)).output(**ffmpeg_options)
async with ffmpeg_semaphore:
await ffmpeg.execute()
return output_path
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment