Skip to content

Instantly share code, notes, and snippets.

@keithchambers
Created January 28, 2025 03:34
Show Gist options
  • Save keithchambers/8b328f62a99b9daa27235955ddfeeeb5 to your computer and use it in GitHub Desktop.
Save keithchambers/8b328f62a99b9daa27235955ddfeeeb5 to your computer and use it in GitHub Desktop.
YouTube Transcript Downloader
usage: yt-transcript-downloader.py [-h] [-k KEY] [-v VIDEO] [-c CHANNEL] [-p PLAYLIST] [-s SEARCH] [-o {relevance,date,rating,views}] [-m MAX_SIZE] [-a AGE]
Download YouTube transcripts and metadata in YAML format,
with an optional max YAML file size limit.
options:
-h, --help show this help message and exit
-k KEY, --key KEY YouTube API Key to use. Overrides YOUTUBE_API_KEY env variable if not set.
-v VIDEO, --video VIDEO
YouTube Video ID or URL
-c CHANNEL, --channel CHANNEL
YouTube Channel handle (@channelName), URL, or ID
-p PLAYLIST, --playlist PLAYLIST
YouTube Playlist ID or URL
-s SEARCH, --search SEARCH
Search query string
-o {relevance,date,rating,views}, --order {relevance,date,rating,views}
Order search results by relevance/date/rating/views (default: relevance)
-m MAX_SIZE, --max-size MAX_SIZE
Max YAML file size in MB (default: 0.5). Use -1 for no limit. Must be >= 0.1 or -1.
-a AGE, --age AGE Filter videos by maximum age in days (optional).
Output file schema:
video_url:
title: <video title>
published: <publication date>
likes: <number of likes>
comments: <number of comments>
description: <video description>
tags:
- tag1
- tag2
transcript: <transcript text>
views: <number of views>
#!/usr/bin/env python3
import argparse
import os
import sys
import logging
from typing import List, Dict, Any
from urllib.parse import urlparse, parse_qs
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, VideoUnavailable, NoTranscriptFound
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from datetime import datetime, timedelta, timezone
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("yt_transcript_downloader")
# Constants
DEFAULT_ORDER = "relevance"
ORDER_OPTIONS = ["relevance", "date", "rating", "views"]
def validate_max_size(value: str) -> float:
"""
Validates the --max-size argument:
- Must be either -1 or >= 0.1
- Accepts at most 1 decimal place (though we won't strictly fail if there's no decimal).
"""
try:
float_val = float(value)
except ValueError:
raise argparse.ArgumentTypeError(f"Invalid float value: {value}")
# Check decimal places
if "." in value:
decimal_part = value.split(".", 1)[1]
if len(decimal_part) > 1: # more than 1 decimal place
raise argparse.ArgumentTypeError(
"Value must have at most 1 decimal place."
)
# Check range
if float_val != -1 and float_val < 0.1:
raise argparse.ArgumentTypeError(
"Value must be either -1 (no limit) or >= 0.1"
)
return float_val
# Helper: Extract Video ID More Robustly
def extract_video_id(url_or_id: str) -> str:
parsed_url = urlparse(url_or_id)
if parsed_url.hostname in ["www.youtube.com", "youtube.com"]:
if parsed_url.path == "/watch":
query = parse_qs(parsed_url.query)
video_id = query.get("v")
if video_id:
return video_id[0]
elif parsed_url.path.startswith("/embed/"):
return parsed_url.path.split("/embed/")[1]
elif parsed_url.path.startswith("/v/"):
return parsed_url.path.split("/v/")[1]
elif parsed_url.hostname == "youtu.be":
return parsed_url.path.lstrip("/")
else:
# Check if the string is exactly 11 chars of typical YouTube ID chars
if len(url_or_id) == 11 and all(c.isalnum() or c in ['-', '_'] for c in url_or_id):
return url_or_id
else:
raise ValueError(f"Cannot extract video ID from '{url_or_id}'")
# Helper: Extract Playlist ID More Robustly
def extract_playlist_id(url_or_id: str) -> str:
parsed_url = urlparse(url_or_id)
if parsed_url.hostname in ["www.youtube.com", "youtube.com"]:
query = parse_qs(parsed_url.query)
playlist_id = query.get("list")
if playlist_id:
return playlist_id[0]
elif parsed_url.hostname == "youtu.be":
query = parse_qs(parsed_url.query)
playlist_id = query.get("list")
if playlist_id:
return playlist_id[0]
# Fallback check if the user passed a raw ID-like string
if len(url_or_id) >= 10 and all(c.isalnum() or c in ['-', '_'] for c in url_or_id):
return url_or_id
else:
raise ValueError(f"Cannot extract playlist ID from '{url_or_id}'")
# Helper: Resolve channel handle, URL, or ID and get the channel's uploads playlist
def resolve_channel_uploads_playlist_id(api_key: str, channel_str: str) -> Dict[str, str]:
"""
Given a channel handle, URL, or ID (like @conorneill, https://www.youtube.com/@handle, or UC2ap7sGyXh1-d6O3B9o9PJA),
return a dict with 'channel_id', 'channel_name', and 'uploads_playlist_id'.
"""
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False)
# Check if channel_str is a URL
parsed_url = urlparse(channel_str)
if parsed_url.scheme in ["http", "https"]:
if parsed_url.hostname in ["www.youtube.com", "youtube.com"]:
path_parts = parsed_url.path.strip("/").split("/")
if path_parts and path_parts[0].startswith("@"):
handle = path_parts[0][1:] # remove '@'
logger.info(f"Resolving channel handle from URL: @{handle}")
else:
raise ValueError(f"Unsupported channel URL format: '{channel_str}'")
else:
raise ValueError(f"Unsupported channel URL hostname: '{channel_str}'")
elif channel_str.startswith("@"):
handle = channel_str[1:]
logger.info(f"Resolving channel handle: @{handle}")
else:
handle = None # Assume it's a channel ID
if handle:
try:
# Use the search endpoint to find the channel by handle
search_response = youtube.search().list(
part="snippet",
q=handle,
type="channel",
maxResults=1
).execute()
except HttpError as e:
handle_http_error(e)
if search_response.get("items"):
channel = search_response["items"][0]
channel_id = channel["snippet"]["channelId"]
try:
# Now get the uploads playlist
channel_response = youtube.channels().list(
part="snippet,contentDetails",
id=channel_id
).execute()
except HttpError as e:
handle_http_error(e)
if channel_response.get("items"):
channel_info = channel_response["items"][0]
return {
"channel_id": channel_info["id"],
"channel_name": channel_info["snippet"]["title"].replace(" ", "_"),
"uploads_playlist_id": channel_info["contentDetails"]["relatedPlaylists"]["uploads"]
}
logger.error(f"Could not resolve handle: @{handle}")
sys.exit(1)
else:
# Try using channel ID directly
logger.info(f"Resolving channel ID: {channel_str}")
try:
resp_id = youtube.channels().list(
part="id,snippet,contentDetails",
id=channel_str
).execute()
except HttpError as e:
handle_http_error(e)
if resp_id.get("items"):
channel = resp_id["items"][0]
return {
"channel_id": channel["id"],
"channel_name": channel["snippet"]["title"].replace(" ", "_"),
"uploads_playlist_id": channel["contentDetails"]["relatedPlaylists"]["uploads"]
}
logger.error(f"Could not resolve channel ID: {channel_str}")
sys.exit(1)
# Helper: Fetch all videos from a channel's uploads playlist
def fetch_uploads_playlist_videos(api_key: str, playlist_id: str) -> List[str]:
"""
Fetches ALL video IDs from the specified playlist ID (channel's upload playlist).
"""
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False)
video_ids = []
try:
request = youtube.playlistItems().list(
part="contentDetails",
playlistId=playlist_id,
maxResults=50
)
while request:
response = request.execute()
for item in response["items"]:
video_ids.append(item["contentDetails"]["videoId"])
request = youtube.playlistItems().list_next(request, response)
except HttpError as e:
handle_http_error(e)
logger.info(f"Fetched {len(video_ids)} videos from playlist {playlist_id}")
return video_ids
# Helper: Fetch video metadata for a single video
def fetch_video_metadata_single(api_key: str, video_id: str) -> Dict[str, Any]:
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False)
try:
response = youtube.videos().list(
part="snippet,statistics",
id=video_id
).execute()
except HttpError as e:
handle_http_error(e)
if not response.get("items"):
logger.warning(f"Metadata not found for video ID {video_id}. Skipping.")
return {}
item = response["items"][0]
snippet = item.get("snippet", {})
stats = item.get("statistics", {})
# Handle 'publishedAt' safely
published_at = snippet.get("publishedAt")
if published_at:
try:
published = datetime.strptime(
published_at, "%Y-%m-%dT%H:%M:%SZ"
).strftime("%Y-%m-%d")
except Exception as e:
logger.warning(f"Error processing published date for video ID {video_id}: {e}")
published = "unknown"
else:
published = "unknown"
# Construct metadata dictionary
metadata = {
"video": f"https://www.youtube.com/watch?v={video_id}",
"title": snippet.get("title", "No Title"),
"published": published,
"likes": int(stats.get("likeCount", 0)),
"comments": int(stats.get("commentCount", 0)),
"description": snippet.get("description", ""),
"tags": snippet.get("tags", []),
"transcript": "",
"views": int(stats.get("viewCount", 0)),
}
return metadata
# Helper: Fetch transcript for a single video
def fetch_transcript_single(video_id: str) -> str:
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
transcript_text = " ".join(entry["text"] for entry in transcript)
logger.info(f"Fetching transcript for video ID: {video_id}... OK")
return transcript_text
except (NoTranscriptFound, TranscriptsDisabled, VideoUnavailable):
logger.info(f"Fetching transcript for video ID: {video_id}... unavailable")
return ""
except Exception as e:
logger.info(f"Fetching transcript for video ID: {video_id}... error: {e}")
return ""
def format_video_yaml(video: Dict[str, Any]) -> str:
"""
Returns the YAML text that would be written for a single video in the output file.
"""
lines = [f"{video['video']}:"]
for key, value in video.items():
if key == "video":
continue # Already used as the key
# Handle multiline for transcripts or descriptions
if isinstance(value, str) and "\n" in value:
# Indent subsequent lines
lines.append(f" {key}: |\n " + value.replace("\n", "\n "))
elif isinstance(value, list):
# For tags or other lists
if not value:
lines.append(f" {key}: []")
else:
lines.append(f" {key}:")
for elem in value:
lines.append(f" - {elem}")
else:
lines.append(f" {key}: {value}")
return "\n".join(lines) + "\n"
def handle_http_error(e: HttpError):
error_content = e.content.decode('utf-8') if e.content else ""
if e.resp.status in [403]:
if "quotaExceeded" in error_content:
logger.error("Error: You have exceeded your YouTube Data API quota.")
logger.error("Please wait for the quota to reset or consider requesting a higher quota.")
else:
logger.error(f"HTTP Error {e.resp.status}: {e.content}")
else:
logger.error(f"HTTP Error {e.resp.status}: {e.content}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="""
Download YouTube transcripts and metadata in YAML format,
with an optional max YAML file size limit.
""",
epilog="""
Output file schema:
video_url:
title: <video title>
published: <publication date>
likes: <number of likes>
comments: <number of comments>
description: <video description>
tags:
- tag1
- tag2
transcript: <transcript text>
views: <number of views>
""",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"-k", "--key",
help="YouTube API Key to use. Overrides YOUTUBE_API_KEY env variable if not set."
)
parser.add_argument("-v", "--video", help="YouTube Video ID or URL")
parser.add_argument("-c", "--channel", help="YouTube Channel handle (@channelName), URL, or ID")
parser.add_argument("-p", "--playlist", help="YouTube Playlist ID or URL")
parser.add_argument("-s", "--search", help="Search query string")
parser.add_argument(
"-o", "--order",
choices=ORDER_OPTIONS,
default=DEFAULT_ORDER,
help="Order search results by relevance/date/rating/views (default: relevance)"
)
parser.add_argument(
"-m", "--max-size",
type=validate_max_size,
default=0.5,
help="Max YAML file size in MB (default: 0.5). Use -1 for no limit. Must be >= 0.1 or -1."
)
parser.add_argument(
"-a", "--age", type=int,
help="Filter videos by maximum age in days (optional)."
)
args = parser.parse_args()
api_key = args.key or os.getenv("YOUTUBE_API_KEY")
if not api_key:
logger.error("API key is not set. Provide it using -k or set the YOUTUBE_API_KEY environment variable.")
sys.exit(1)
published_after = None
if args.age:
published_after = (datetime.now(timezone.utc) - timedelta(days=args.age)).strftime("%Y-%m-%dT%H:%M:%SZ")
# Figure out the output file name and prefix
prefix = ""
video_ids: List[str] = []
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False)
if args.video:
prefix = "video-"
try:
video_id = extract_video_id(args.video)
except ValueError as e:
logger.error(e)
sys.exit(1)
# Derive output filename from the video title
try:
resp = youtube.videos().list(part="snippet", id=video_id).execute()
except HttpError as e:
handle_http_error(e)
if "items" in resp and resp["items"]:
video_title = resp["items"][0]["snippet"]["title"].replace(" ", "_")
output_file = f"{prefix}{video_title}.yaml"
else:
logger.error("Video not found.")
sys.exit(1)
video_ids = [video_id]
elif args.channel:
prefix = "channel-"
try:
channel_info = resolve_channel_uploads_playlist_id(api_key, args.channel)
except ValueError as e:
logger.error(e)
sys.exit(1)
output_file = f"{prefix}{channel_info['channel_name']}.yaml"
video_ids = fetch_uploads_playlist_videos(api_key, channel_info["uploads_playlist_id"])
elif args.playlist:
prefix = "playlist-"
try:
playlist_id = extract_playlist_id(args.playlist)
except ValueError as e:
logger.error(e)
sys.exit(1)
try:
pl_response = youtube.playlists().list(part="snippet", id=playlist_id).execute()
except HttpError as e:
handle_http_error(e)
if not pl_response.get("items"):
logger.error("Invalid playlist ID or no playlist found.")
sys.exit(1)
playlist_name = pl_response["items"][0]["snippet"]["title"].replace(" ", "_")
output_file = f"{prefix}{playlist_name}.yaml"
# Gather all video IDs in this playlist
video_ids = []
try:
request = youtube.playlistItems().list(
part="contentDetails",
playlistId=playlist_id,
maxResults=50
)
while request:
response = request.execute()
for item in response["items"]:
video_ids.append(item["contentDetails"]["videoId"])
request = youtube.playlistItems().list_next(request, response)
except HttpError as e:
handle_http_error(e)
elif args.search:
# We'll keep "search" prefix or filename as the user did not explicitly say to rename it
# but we won't limit the results by an integer anymore.
# We'll gather as many as the API paging returns (subject to some large max).
search_term_sanitized = args.search.replace(" ", "_")
output_file = f"search_{search_term_sanitized}.yaml"
video_ids = []
try:
request = youtube.search().list(
part="id,snippet",
q=args.search,
maxResults=50,
type="video",
order=args.order,
publishedAfter=published_after
)
while request:
search_resp = request.execute()
for item in search_resp["items"]:
vid = item["id"]["videoId"]
video_ids.append(vid)
request = youtube.search().list_next(request, search_resp)
except HttpError as e:
handle_http_error(e)
else:
logger.error("Provide either --video, --channel, --playlist, or --search.")
sys.exit(1)
if not video_ids:
logger.error("No videos found to process.")
sys.exit(1)
# Print fully qualified path to output file on entry
full_output_path = os.path.abspath(output_file)
logger.info(f"Output file: {full_output_path}")
# 1) Open the output file for writing
try:
with open(output_file, "w", encoding="utf-8") as f:
# Initialize current size
current_size = 0
if args.channel:
max_bytes = -1 # No size limit for channel downloads
else:
max_bytes = -1 if args.max_size == -1 else int(args.max_size * 1024 * 1024)
total_videos = len(video_ids)
processed_videos = 0
for vid in video_ids:
# Fetch metadata
metadata = fetch_video_metadata_single(api_key, vid)
if not metadata:
continue # Skip if metadata not found
# Fetch transcript
transcript = fetch_transcript_single(vid)
metadata["transcript"] = transcript
# Format YAML block
video_block = format_video_yaml(metadata)
# Calculate additional bytes
additional_bytes = len(video_block.encode("utf-8"))
# Check if adding this block exceeds max size
if max_bytes != -1 and (current_size + additional_bytes) > max_bytes:
logger.info(f"Skipping video ID {vid} (would exceed max size of {args.max_size} Mb).")
logger.info(f"Failed max size of {args.max_size} Mb exceeded.")
break
# Write to file
f.write(video_block)
f.flush() # Ensure data is written to disk
# Update current size
current_size += additional_bytes
processed_videos += 1
logger.info(f"Processed {processed_videos} out of {total_videos} videos.")
logger.info(f"Data saved to {output_file}")
except IOError as e:
logger.error(f"Failed to write to file {output_file}: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment