Created
April 17, 2025 09:26
-
-
Save code-yeongyu/97532488723a1aa4df66519f891dd738 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import os | |
| from dataclasses import dataclass | |
| from datetime import timedelta | |
| from pathlib import Path | |
| import ffprobe3 | |
| import srt | |
| from ffmpeg.asyncio import FFmpeg | |
| ffmpeg_semaphore = asyncio.Semaphore(os.cpu_count() or 1) | |
| @dataclass | |
| class AudioChunk: | |
| start_time: float | |
| end_time: float | |
| file_path: Path | |
| @property | |
| def duration(self) -> float: | |
| return self.end_time - self.start_time | |
| async def split_audio( | |
| input_path: Path, | |
| chunk_size_mb: int = 20, | |
| output_dir: Path | None = None, | |
| ) -> list[AudioChunk]: | |
| """Split audio file into chunks of specified size.""" | |
| output_dir = output_dir or input_path.parent / "chunks" | |
| output_dir.mkdir(exist_ok=True) | |
| # Get audio duration using ffprobe | |
| probe = ffprobe3.probe(str(input_path)) | |
| duration = float(probe.format.duration_secs) # type: ignore | |
| # Calculate number of chunks based on file size and target chunk size | |
| file_size = input_path.stat().st_size | |
| num_chunks = (file_size // (chunk_size_mb * 1024 * 1024)) + 1 | |
| chunk_duration = duration / num_chunks | |
| chunks: list[AudioChunk] = [] | |
| # Split audio into chunks | |
| async def process_chunk(i: int) -> AudioChunk: | |
| start_time = i * chunk_duration | |
| end_time = min((i + 1) * chunk_duration, duration) | |
| chunk_path = output_dir / f"chunk_{i}{input_path.suffix}" | |
| async with ffmpeg_semaphore: | |
| # Use FFmpeg to extract chunk | |
| ffmpeg = ( | |
| FFmpeg() | |
| .option("y") # Overwrite output files without asking | |
| .input(str(input_path), ss=start_time, t=end_time - start_time) | |
| .output(str(chunk_path), c="copy") # Use copy codec for faster processing | |
| ) | |
| await ffmpeg.execute() | |
| return AudioChunk( | |
| start_time=start_time, | |
| end_time=end_time, | |
| file_path=chunk_path, | |
| ) | |
| # Process all chunks concurrently | |
| chunks = await asyncio.gather(*[process_chunk(i) for i in range(num_chunks)]) | |
| return chunks | |
| def merge_srt_files(chunks: list[tuple[AudioChunk, str]]) -> str: | |
| """Merge multiple SRT files with adjusted timestamps.""" | |
| all_subtitles: list[srt.Subtitle] = [] | |
| current_index = 1 | |
| for chunk, srt_content_original in chunks: | |
| srt_content = clean_result(srt_content_original) | |
| chunk_subs = list(srt.parse(srt_content)) | |
| # Adjust timestamps for each subtitle in chunk | |
| for sub in chunk_subs: | |
| sub.start += timedelta(seconds=chunk.start_time) | |
| sub.end += timedelta(seconds=chunk.start_time) | |
| sub.index = current_index | |
| current_index += 1 | |
| all_subtitles.append(sub) | |
| # Sort subtitles by start time | |
| all_subtitles.sort(key=lambda x: x.start) | |
| # Compose final SRT content | |
| return srt.compose(all_subtitles) | |
| def clean_result(result: str) -> str: | |
| return result.strip().strip('"').replace("\\n", "\n") | |
| async def convert_to_mp3(input_path: Path, output_dir: Path | None = None) -> Path: | |
| """Convert video/audio file to MP3 format.""" | |
| output_dir = output_dir or input_path.parent | |
| output_path = output_dir / f"{input_path.stem}.mp3" | |
| # Check if running on macOS | |
| ffmpeg_options = { | |
| "url": str(output_path), | |
| "acodec": "libmp3lame", | |
| "ab": "192k", | |
| "vn": None, | |
| } | |
| ffmpeg = FFmpeg().option("y").input(str(input_path)).output(**ffmpeg_options) | |
| async with ffmpeg_semaphore: | |
| await ffmpeg.execute() | |
| return output_path |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment