import argparse |
import os |
import re |
import string |
import warnings |
import isodate |
import whisper |
from googleapiclient.discovery import build |
from googleapiclient.errors import HttpError |
from langchain import OpenAI, PromptTemplate |
from langchain.chains.summarize import load_summarize_chain |
from langchain.docstore.document import Document |
from pydub import AudioSegment |
from pytube import YouTube |
# Your API key goes here |
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") |
# set up prompts here for formatting |
map_prompt_template = """ |
The following is the transcript of a video. Please provide a brief summary of the video, including the main points and key takeaways. Output should be as a markdown outline. |
{text} |
""" |
MAP_PROMPT = PromptTemplate(template=map_prompt_template, input_variables=["text"]) |
combine_prompt_template = """Here are a few markdown outlines of the video. Please combine them into a single outline. |
{text} |
""" |
COMBINE_PROMPT = PromptTemplate( |
template=combine_prompt_template, input_variables=["text"] |
) |
def create_summary_filename(video_title, channel_title): |
valid_chars = f"-_.() {string.ascii_letters}{string.digits}" |
safe_video_title = ( |
"".join(c for c in video_title if c in valid_chars).strip().replace(" ", "_") |
) |
safe_channel_title = ( |
"".join(c for c in channel_title if c in valid_chars).strip().replace(" ", "_") |
) |
filename = f"summaries/{safe_channel_title}_{safe_video_title}.md" |
return filename |
# Get video ID from the URL |
def get_video_id(url): |
video_id = None |
pattern = re.compile( |
r"(https?://)?(www\.)?(youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]+)" |
) |
match = pattern.match(url) |
if match: |
video_id = match.group(4) |
return video_id |
# Get video details using YouTube Data API v3 |
def get_video_details(video_id): |
try: |
youtube = build("youtube", "v3", developerKey=GOOGLE_API_KEY) |
response = ( |
youtube.videos().list(part="snippet,contentDetails", id=video_id).execute() |
) |
return response["items"][0] if response["items"] else None |
except HttpError as e: |
print(f"An error occurred: {e}") |
return None |
# Convert ISO 8601 duration to a human-readable format |
def format_duration(duration): |
parsed_duration = isodate.parse_duration(duration) |
total_seconds = int(parsed_duration.total_seconds()) |
hours, remainder = divmod(total_seconds, 3600) |
minutes, seconds = divmod(remainder, 60) |
return f"{hours:02d}:{minutes:02d}:{seconds:02d}" |
def generate_unique_filename(video_id, prefix, extension): |
return f"{prefix}/{video_id}.{extension}" |
def transcribe_audio(video_id, video_url): |
# Create directories for audio_streams and transcriptions if they don't exist |
os.makedirs("audio_streams", exist_ok=True) |
os.makedirs("transcriptions", exist_ok=True) |
transcription_filename = generate_unique_filename(video_id, "transcriptions", "txt") |
if os.path.exists(transcription_filename): |
with open(transcription_filename, "r") as transcription_file: |
transcription = transcription_file.read() |
else: |
# Download the video as audio |
yt = YouTube(video_url) |
video = yt.streams.filter(only_audio=True).first() |
audio_filename = generate_unique_filename(video_id, "audio_streams", "mp4") |
file_name = video.download(filename=audio_filename) |
# Convert the audio file to WAV format |
audio = AudioSegment.from_file(file_name) |
audio.export("audio.wav", format="wav") |
# Load the Whisper ASR model |
with warnings.catch_warnings(): |
warnings.simplefilter("ignore", category=UserWarning) |
model = whisper.load_model("base") |
# Transcribe the audio |
result = model.transcribe("audio.wav") |
transcription = result["text"] |
# Save the transcription to a file |
with open(transcription_filename, "w") as transcription_file: |
transcription_file.write(transcription) |
# Cleanup |
os.remove("audio.wav") |
return transcription |
def split_text_to_documents(text, max_length=4096, overlap=100): |
tokens = text.split() |
text_chunks = [] |
current_chunk = [] |
current_length = 0 |
for token in tokens: |
if current_length + len(token) + 1 > max_length - overlap: |
text_chunks.append(" ".join(current_chunk)) |
current_chunk = current_chunk[-overlap:] |
current_length = sum(len(t) + 1 for t in current_chunk) |
current_chunk.append(token) |
current_length += len(token) + 1 |
if current_chunk: |
text_chunks.append(" ".join(current_chunk)) |
return [Document(page_content=t) for t in text_chunks] |
# Main function |
def main(args): |
if args.url: |
url = args.url |
else: |
url = input("Please enter a YouTube video URL: ") |
video_id = get_video_id(url) |
if not video_id: |
print("Invalid YouTube URL") |
return |
embed_url = f"https://www.youtube.com/embed/{video_id}" |
embed_code = f'<iframe width="560" height="315" src="{embed_url}" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>' |
video_details = get_video_details(video_id) |
if not video_details: |
print("Could not fetch video details") |
return |
snippet = video_details["snippet"] |
content_details = video_details["contentDetails"] |
title = snippet["title"] |
description = snippet["description"] |
channel_title = snippet["channelTitle"] |
length = format_duration(content_details["duration"]) |
published_at = snippet["publishedAt"] |
markdown_block = f""" |
{embed_code} |
## {title} |
**Channel**: {channel_title} |
**Published**: {published_at} |
**Length**: {length} |
**Description**: |
{description} |
""" |
print(markdown_block) |
if args.transcribe: |
transcription = transcribe_audio(video_id, url) |
if args.summary: |
llm = OpenAI(temperature=0) |
# Split the transcription into smaller chunks as Documents |
docs = split_text_to_documents(transcription) |
# Choose a chain type for summarization |
chain = load_summarize_chain( |
llm, |
chain_type="map_reduce", |
map_prompt=MAP_PROMPT, |
combine_prompt=COMBINE_PROMPT, |
) |
# Run the summarization chain |
summary = chain.run(docs) |
# Print or store the summary |
print(summary) |
output_filename = create_summary_filename(title, channel_title) |
os.makedirs("summaries", exist_ok=True) |
with open(output_filename, "w") as output_file: |
output_file.write(markdown_block) |
if args.transcribe: |
transcription = transcribe_audio(video_id, url) |
if args.summary: |
output_file.write(f"\n\n{summary}") |
print(f"Summary saved to {output_filename}") |
if __name__ == "__main__": |
parser = argparse.ArgumentParser( |
description="Fetch YouTube video information and generate a markdown block" |
) |
parser.add_argument("-u", "--url", help="YouTube video URL") |
parser.add_argument( |
"-t", "--transcribe", action="store_true", help="Transcribe the video audio" |
) |
parser.add_argument( |
"-s", "--summary", action="store_true", help="Summarize the video transcript" |
) |
args = parser.parse_args() |
main(args) |
Getting the following error:
Traceback (most recent call last):
File "/Users/justinwinter/projects/yt-video-summarizer/yt_summarize.py", line 241, in
File "/Users/justinwinter/projects/yt-video-summarizer/yt_summarize.py", line 193, in main
transcription = transcribe_audio(video_id, url)
File "/Users/justinwinter/projects/yt-video-summarizer/yt_summarize.py", line 103, in transcribe_audio
video = yt.streams.filter(only_audio=True).first()
File "/opt/homebrew/Caskroom/miniconda/base/envs/yt-summarizer/lib/python3.9/site-packages/pytube/main.py", line 296, in streams
return StreamQuery(self.fmt_streams)
File "/opt/homebrew/Caskroom/miniconda/base/envs/yt-summarizer/lib/python3.9/site-packages/pytube/main.py", line 176, in fmt_streams
stream_manifest = extract.apply_descrambler(self.streaming_data)
File "/opt/homebrew/Caskroom/miniconda/base/envs/yt-summarizer/lib/python3.9/site-packages/pytube/main.py", line 161, in streaming_data
return self.vid_info['streamingData']
KeyError: 'streamingData'