Last active
October 11, 2024 14:58
-
-
Save nfedyashev/296a469923e00887d8b1ba0457941da0 to your computer and use it in GitHub Desktop.
python3 youtube.py --url "$YOUTUBE_URL" --language en --output "transcription.txt"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import yt_dlp | |
from pydub import AudioSegment | |
import openai | |
import math | |
from tqdm import tqdm | |
import argparse | |
import tempfile | |
import shutil | |
def download_youtube_audio(url, output_dir): | |
""" | |
Downloads audio from a YouTube URL using yt-dlp. | |
""" | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'outtmpl': os.path.join(output_dir, 'downloaded_audio.%(ext)s'), | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'quiet': True, | |
'no_warnings': True, | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info_dict = ydl.extract_info(url, download=True) | |
# After download, the file will be named 'downloaded_audio.mp3' | |
audio_filename = ydl.prepare_filename(info_dict) | |
base, ext = os.path.splitext(audio_filename) | |
audio_file = f"{base}.mp3" | |
if not os.path.isfile(audio_file): | |
raise FileNotFoundError(f"Expected audio file {audio_file} not found.") | |
return audio_file | |
#def split_audio(audio_path, chunk_length_ms=2400000): | |
# probably 15 minutes | |
def split_audio(audio_path, chunk_length_ms): | |
""" | |
Splits the audio file into chunks of specified length in milliseconds. | |
Default is 40 minutes (2400000 ms) to keep each chunk under 25MB. | |
Adjust based on bitrate and size. | |
""" | |
audio = AudioSegment.from_file(audio_path) | |
total_length = len(audio) | |
chunks = [] | |
for i in range(0, total_length, chunk_length_ms): | |
chunk = audio[i:i + chunk_length_ms] | |
chunk_filename = f"chunk_{i//chunk_length_ms + 1}.mp3" | |
chunk.export(chunk_filename, format="mp3") | |
chunks.append(chunk_filename) | |
return chunks | |
def transcribe_audio_chunks(chunks, language): | |
""" | |
Transcribes each audio chunk using OpenAI's Whisper API. | |
""" | |
transcriptions = [] | |
for chunk in tqdm(chunks, desc="Transcribing"): | |
with open(chunk, 'rb') as audio_file: | |
try: | |
transcript = openai.Audio.transcribe( | |
file=audio_file, | |
model="whisper-1", | |
language=language | |
) | |
transcriptions.append(transcript['text']) | |
except Exception as e: | |
print(f"Error transcribing {chunk}: {e}") | |
transcriptions.append("") | |
return transcriptions | |
def cleanup_files(files): | |
""" | |
Removes temporary files. | |
""" | |
for file in files: | |
try: | |
os.remove(file) | |
except Exception as e: | |
print(f"Could not delete {file}: {e}") | |
def main(): | |
parser = argparse.ArgumentParser(description="YouTube to Text using yt-dlp and OpenAI Whisper.") | |
parser.add_argument('--url', type=str, required=True, help='YouTube video URL') | |
parser.add_argument('--language', type=str, required=True, help='Language code (e.g., en, es, fr)') | |
parser.add_argument('--output', type=str, default='transcription.txt', help='Output text file name') | |
args = parser.parse_args() | |
youtube_url = args.url | |
language = args.language | |
output_file = args.output | |
# Check for OpenAI API key | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
print("Error: OPENAI_API_KEY environment variable not set.") | |
sys.exit(1) | |
openai.api_key = openai_api_key | |
# Use a temporary directory to store downloads and chunks | |
with tempfile.TemporaryDirectory() as tmpdir: | |
try: | |
print("Downloading audio from YouTube...") | |
audio_output = download_youtube_audio(youtube_url, tmpdir) | |
print(f"Audio downloaded to {audio_output}") | |
print("Splitting audio into chunks...") | |
# Adjust chunk_length_ms based on your needs and API limits | |
chunks = split_audio(audio_output, chunk_length_ms=900000) # 15 minutes | |
print(f"Total chunks created: {len(chunks)}") | |
print("Starting transcription...") | |
transcriptions = transcribe_audio_chunks(chunks, language) | |
print("Aggregating transcriptions...") | |
full_transcription = "\n".join(transcriptions) | |
with open(output_file, 'w', encoding='utf-8') as f: | |
f.write(full_transcription) | |
print(f"Transcription saved to {output_file}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
sys.exit(1) | |
finally: | |
print("Cleaning up temporary files...") | |
# TemporaryDirectory context manager handles cleanup | |
print("Cleanup completed.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
pip3 install tempfile
pip3 install pydub
pip3 install yt-dlp