Created
January 28, 2025 03:34
-
-
Save keithchambers/8b328f62a99b9daa27235955ddfeeeb5 to your computer and use it in GitHub Desktop.
YouTube Transcript Downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
usage: yt-transcript-downloader.py [-h] [-k KEY] [-v VIDEO] [-c CHANNEL] [-p PLAYLIST] [-s SEARCH] [-o {relevance,date,rating,views}] [-m MAX_SIZE] [-a AGE] | |
Download YouTube transcripts and metadata in YAML format, | |
with an optional max YAML file size limit. | |
options: | |
-h, --help show this help message and exit | |
-k KEY, --key KEY YouTube API Key to use. Overrides YOUTUBE_API_KEY env variable if not set. | |
-v VIDEO, --video VIDEO | |
YouTube Video ID or URL | |
-c CHANNEL, --channel CHANNEL | |
YouTube Channel handle (@channelName), URL, or ID | |
-p PLAYLIST, --playlist PLAYLIST | |
YouTube Playlist ID or URL | |
-s SEARCH, --search SEARCH | |
Search query string | |
-o {relevance,date,rating,views}, --order {relevance,date,rating,views} | |
Order search results by relevance/date/rating/views (default: relevance) | |
-m MAX_SIZE, --max-size MAX_SIZE | |
Max YAML file size in MB (default: 0.5). Use -1 for no limit. Must be >= 0.1 or -1. | |
-a AGE, --age AGE Filter videos by maximum age in days (optional). | |
Output file schema: | |
video_url: | |
title: <video title> | |
published: <publication date> | |
likes: <number of likes> | |
comments: <number of comments> | |
description: <video description> | |
tags: | |
- tag1 | |
- tag2 | |
transcript: <transcript text> | |
views: <number of views> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import os | |
import sys | |
import logging | |
from typing import List, Dict, Any | |
from urllib.parse import urlparse, parse_qs | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, VideoUnavailable, NoTranscriptFound | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
from datetime import datetime, timedelta, timezone | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(message)s") | |
logger = logging.getLogger("yt_transcript_downloader") | |
# Constants | |
DEFAULT_ORDER = "relevance" | |
ORDER_OPTIONS = ["relevance", "date", "rating", "views"] | |
def validate_max_size(value: str) -> float: | |
""" | |
Validates the --max-size argument: | |
- Must be either -1 or >= 0.1 | |
- Accepts at most 1 decimal place (though we won't strictly fail if there's no decimal). | |
""" | |
try: | |
float_val = float(value) | |
except ValueError: | |
raise argparse.ArgumentTypeError(f"Invalid float value: {value}") | |
# Check decimal places | |
if "." in value: | |
decimal_part = value.split(".", 1)[1] | |
if len(decimal_part) > 1: # more than 1 decimal place | |
raise argparse.ArgumentTypeError( | |
"Value must have at most 1 decimal place." | |
) | |
# Check range | |
if float_val != -1 and float_val < 0.1: | |
raise argparse.ArgumentTypeError( | |
"Value must be either -1 (no limit) or >= 0.1" | |
) | |
return float_val | |
# Helper: Extract Video ID More Robustly | |
def extract_video_id(url_or_id: str) -> str: | |
parsed_url = urlparse(url_or_id) | |
if parsed_url.hostname in ["www.youtube.com", "youtube.com"]: | |
if parsed_url.path == "/watch": | |
query = parse_qs(parsed_url.query) | |
video_id = query.get("v") | |
if video_id: | |
return video_id[0] | |
elif parsed_url.path.startswith("/embed/"): | |
return parsed_url.path.split("/embed/")[1] | |
elif parsed_url.path.startswith("/v/"): | |
return parsed_url.path.split("/v/")[1] | |
elif parsed_url.hostname == "youtu.be": | |
return parsed_url.path.lstrip("/") | |
else: | |
# Check if the string is exactly 11 chars of typical YouTube ID chars | |
if len(url_or_id) == 11 and all(c.isalnum() or c in ['-', '_'] for c in url_or_id): | |
return url_or_id | |
else: | |
raise ValueError(f"Cannot extract video ID from '{url_or_id}'") | |
# Helper: Extract Playlist ID More Robustly | |
def extract_playlist_id(url_or_id: str) -> str: | |
parsed_url = urlparse(url_or_id) | |
if parsed_url.hostname in ["www.youtube.com", "youtube.com"]: | |
query = parse_qs(parsed_url.query) | |
playlist_id = query.get("list") | |
if playlist_id: | |
return playlist_id[0] | |
elif parsed_url.hostname == "youtu.be": | |
query = parse_qs(parsed_url.query) | |
playlist_id = query.get("list") | |
if playlist_id: | |
return playlist_id[0] | |
# Fallback check if the user passed a raw ID-like string | |
if len(url_or_id) >= 10 and all(c.isalnum() or c in ['-', '_'] for c in url_or_id): | |
return url_or_id | |
else: | |
raise ValueError(f"Cannot extract playlist ID from '{url_or_id}'") | |
# Helper: Resolve channel handle, URL, or ID and get the channel's uploads playlist | |
def resolve_channel_uploads_playlist_id(api_key: str, channel_str: str) -> Dict[str, str]: | |
""" | |
Given a channel handle, URL, or ID (like @conorneill, https://www.youtube.com/@handle, or UC2ap7sGyXh1-d6O3B9o9PJA), | |
return a dict with 'channel_id', 'channel_name', and 'uploads_playlist_id'. | |
""" | |
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False) | |
# Check if channel_str is a URL | |
parsed_url = urlparse(channel_str) | |
if parsed_url.scheme in ["http", "https"]: | |
if parsed_url.hostname in ["www.youtube.com", "youtube.com"]: | |
path_parts = parsed_url.path.strip("/").split("/") | |
if path_parts and path_parts[0].startswith("@"): | |
handle = path_parts[0][1:] # remove '@' | |
logger.info(f"Resolving channel handle from URL: @{handle}") | |
else: | |
raise ValueError(f"Unsupported channel URL format: '{channel_str}'") | |
else: | |
raise ValueError(f"Unsupported channel URL hostname: '{channel_str}'") | |
elif channel_str.startswith("@"): | |
handle = channel_str[1:] | |
logger.info(f"Resolving channel handle: @{handle}") | |
else: | |
handle = None # Assume it's a channel ID | |
if handle: | |
try: | |
# Use the search endpoint to find the channel by handle | |
search_response = youtube.search().list( | |
part="snippet", | |
q=handle, | |
type="channel", | |
maxResults=1 | |
).execute() | |
except HttpError as e: | |
handle_http_error(e) | |
if search_response.get("items"): | |
channel = search_response["items"][0] | |
channel_id = channel["snippet"]["channelId"] | |
try: | |
# Now get the uploads playlist | |
channel_response = youtube.channels().list( | |
part="snippet,contentDetails", | |
id=channel_id | |
).execute() | |
except HttpError as e: | |
handle_http_error(e) | |
if channel_response.get("items"): | |
channel_info = channel_response["items"][0] | |
return { | |
"channel_id": channel_info["id"], | |
"channel_name": channel_info["snippet"]["title"].replace(" ", "_"), | |
"uploads_playlist_id": channel_info["contentDetails"]["relatedPlaylists"]["uploads"] | |
} | |
logger.error(f"Could not resolve handle: @{handle}") | |
sys.exit(1) | |
else: | |
# Try using channel ID directly | |
logger.info(f"Resolving channel ID: {channel_str}") | |
try: | |
resp_id = youtube.channels().list( | |
part="id,snippet,contentDetails", | |
id=channel_str | |
).execute() | |
except HttpError as e: | |
handle_http_error(e) | |
if resp_id.get("items"): | |
channel = resp_id["items"][0] | |
return { | |
"channel_id": channel["id"], | |
"channel_name": channel["snippet"]["title"].replace(" ", "_"), | |
"uploads_playlist_id": channel["contentDetails"]["relatedPlaylists"]["uploads"] | |
} | |
logger.error(f"Could not resolve channel ID: {channel_str}") | |
sys.exit(1) | |
# Helper: Fetch all videos from a channel's uploads playlist | |
def fetch_uploads_playlist_videos(api_key: str, playlist_id: str) -> List[str]: | |
""" | |
Fetches ALL video IDs from the specified playlist ID (channel's upload playlist). | |
""" | |
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False) | |
video_ids = [] | |
try: | |
request = youtube.playlistItems().list( | |
part="contentDetails", | |
playlistId=playlist_id, | |
maxResults=50 | |
) | |
while request: | |
response = request.execute() | |
for item in response["items"]: | |
video_ids.append(item["contentDetails"]["videoId"]) | |
request = youtube.playlistItems().list_next(request, response) | |
except HttpError as e: | |
handle_http_error(e) | |
logger.info(f"Fetched {len(video_ids)} videos from playlist {playlist_id}") | |
return video_ids | |
# Helper: Fetch video metadata for a single video | |
def fetch_video_metadata_single(api_key: str, video_id: str) -> Dict[str, Any]: | |
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False) | |
try: | |
response = youtube.videos().list( | |
part="snippet,statistics", | |
id=video_id | |
).execute() | |
except HttpError as e: | |
handle_http_error(e) | |
if not response.get("items"): | |
logger.warning(f"Metadata not found for video ID {video_id}. Skipping.") | |
return {} | |
item = response["items"][0] | |
snippet = item.get("snippet", {}) | |
stats = item.get("statistics", {}) | |
# Handle 'publishedAt' safely | |
published_at = snippet.get("publishedAt") | |
if published_at: | |
try: | |
published = datetime.strptime( | |
published_at, "%Y-%m-%dT%H:%M:%SZ" | |
).strftime("%Y-%m-%d") | |
except Exception as e: | |
logger.warning(f"Error processing published date for video ID {video_id}: {e}") | |
published = "unknown" | |
else: | |
published = "unknown" | |
# Construct metadata dictionary | |
metadata = { | |
"video": f"https://www.youtube.com/watch?v={video_id}", | |
"title": snippet.get("title", "No Title"), | |
"published": published, | |
"likes": int(stats.get("likeCount", 0)), | |
"comments": int(stats.get("commentCount", 0)), | |
"description": snippet.get("description", ""), | |
"tags": snippet.get("tags", []), | |
"transcript": "", | |
"views": int(stats.get("viewCount", 0)), | |
} | |
return metadata | |
# Helper: Fetch transcript for a single video | |
def fetch_transcript_single(video_id: str) -> str: | |
try: | |
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en']) | |
transcript_text = " ".join(entry["text"] for entry in transcript) | |
logger.info(f"Fetching transcript for video ID: {video_id}... OK") | |
return transcript_text | |
except (NoTranscriptFound, TranscriptsDisabled, VideoUnavailable): | |
logger.info(f"Fetching transcript for video ID: {video_id}... unavailable") | |
return "" | |
except Exception as e: | |
logger.info(f"Fetching transcript for video ID: {video_id}... error: {e}") | |
return "" | |
def format_video_yaml(video: Dict[str, Any]) -> str: | |
""" | |
Returns the YAML text that would be written for a single video in the output file. | |
""" | |
lines = [f"{video['video']}:"] | |
for key, value in video.items(): | |
if key == "video": | |
continue # Already used as the key | |
# Handle multiline for transcripts or descriptions | |
if isinstance(value, str) and "\n" in value: | |
# Indent subsequent lines | |
lines.append(f" {key}: |\n " + value.replace("\n", "\n ")) | |
elif isinstance(value, list): | |
# For tags or other lists | |
if not value: | |
lines.append(f" {key}: []") | |
else: | |
lines.append(f" {key}:") | |
for elem in value: | |
lines.append(f" - {elem}") | |
else: | |
lines.append(f" {key}: {value}") | |
return "\n".join(lines) + "\n" | |
def handle_http_error(e: HttpError): | |
error_content = e.content.decode('utf-8') if e.content else "" | |
if e.resp.status in [403]: | |
if "quotaExceeded" in error_content: | |
logger.error("Error: You have exceeded your YouTube Data API quota.") | |
logger.error("Please wait for the quota to reset or consider requesting a higher quota.") | |
else: | |
logger.error(f"HTTP Error {e.resp.status}: {e.content}") | |
else: | |
logger.error(f"HTTP Error {e.resp.status}: {e.content}") | |
sys.exit(1) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description=""" | |
Download YouTube transcripts and metadata in YAML format, | |
with an optional max YAML file size limit. | |
""", | |
epilog=""" | |
Output file schema: | |
video_url: | |
title: <video title> | |
published: <publication date> | |
likes: <number of likes> | |
comments: <number of comments> | |
description: <video description> | |
tags: | |
- tag1 | |
- tag2 | |
transcript: <transcript text> | |
views: <number of views> | |
""", | |
formatter_class=argparse.RawTextHelpFormatter | |
) | |
parser.add_argument( | |
"-k", "--key", | |
help="YouTube API Key to use. Overrides YOUTUBE_API_KEY env variable if not set." | |
) | |
parser.add_argument("-v", "--video", help="YouTube Video ID or URL") | |
parser.add_argument("-c", "--channel", help="YouTube Channel handle (@channelName), URL, or ID") | |
parser.add_argument("-p", "--playlist", help="YouTube Playlist ID or URL") | |
parser.add_argument("-s", "--search", help="Search query string") | |
parser.add_argument( | |
"-o", "--order", | |
choices=ORDER_OPTIONS, | |
default=DEFAULT_ORDER, | |
help="Order search results by relevance/date/rating/views (default: relevance)" | |
) | |
parser.add_argument( | |
"-m", "--max-size", | |
type=validate_max_size, | |
default=0.5, | |
help="Max YAML file size in MB (default: 0.5). Use -1 for no limit. Must be >= 0.1 or -1." | |
) | |
parser.add_argument( | |
"-a", "--age", type=int, | |
help="Filter videos by maximum age in days (optional)." | |
) | |
args = parser.parse_args() | |
api_key = args.key or os.getenv("YOUTUBE_API_KEY") | |
if not api_key: | |
logger.error("API key is not set. Provide it using -k or set the YOUTUBE_API_KEY environment variable.") | |
sys.exit(1) | |
published_after = None | |
if args.age: | |
published_after = (datetime.now(timezone.utc) - timedelta(days=args.age)).strftime("%Y-%m-%dT%H:%M:%SZ") | |
# Figure out the output file name and prefix | |
prefix = "" | |
video_ids: List[str] = [] | |
youtube = build("youtube", "v3", developerKey=api_key, cache_discovery=False) | |
if args.video: | |
prefix = "video-" | |
try: | |
video_id = extract_video_id(args.video) | |
except ValueError as e: | |
logger.error(e) | |
sys.exit(1) | |
# Derive output filename from the video title | |
try: | |
resp = youtube.videos().list(part="snippet", id=video_id).execute() | |
except HttpError as e: | |
handle_http_error(e) | |
if "items" in resp and resp["items"]: | |
video_title = resp["items"][0]["snippet"]["title"].replace(" ", "_") | |
output_file = f"{prefix}{video_title}.yaml" | |
else: | |
logger.error("Video not found.") | |
sys.exit(1) | |
video_ids = [video_id] | |
elif args.channel: | |
prefix = "channel-" | |
try: | |
channel_info = resolve_channel_uploads_playlist_id(api_key, args.channel) | |
except ValueError as e: | |
logger.error(e) | |
sys.exit(1) | |
output_file = f"{prefix}{channel_info['channel_name']}.yaml" | |
video_ids = fetch_uploads_playlist_videos(api_key, channel_info["uploads_playlist_id"]) | |
elif args.playlist: | |
prefix = "playlist-" | |
try: | |
playlist_id = extract_playlist_id(args.playlist) | |
except ValueError as e: | |
logger.error(e) | |
sys.exit(1) | |
try: | |
pl_response = youtube.playlists().list(part="snippet", id=playlist_id).execute() | |
except HttpError as e: | |
handle_http_error(e) | |
if not pl_response.get("items"): | |
logger.error("Invalid playlist ID or no playlist found.") | |
sys.exit(1) | |
playlist_name = pl_response["items"][0]["snippet"]["title"].replace(" ", "_") | |
output_file = f"{prefix}{playlist_name}.yaml" | |
# Gather all video IDs in this playlist | |
video_ids = [] | |
try: | |
request = youtube.playlistItems().list( | |
part="contentDetails", | |
playlistId=playlist_id, | |
maxResults=50 | |
) | |
while request: | |
response = request.execute() | |
for item in response["items"]: | |
video_ids.append(item["contentDetails"]["videoId"]) | |
request = youtube.playlistItems().list_next(request, response) | |
except HttpError as e: | |
handle_http_error(e) | |
elif args.search: | |
# We'll keep "search" prefix or filename as the user did not explicitly say to rename it | |
# but we won't limit the results by an integer anymore. | |
# We'll gather as many as the API paging returns (subject to some large max). | |
search_term_sanitized = args.search.replace(" ", "_") | |
output_file = f"search_{search_term_sanitized}.yaml" | |
video_ids = [] | |
try: | |
request = youtube.search().list( | |
part="id,snippet", | |
q=args.search, | |
maxResults=50, | |
type="video", | |
order=args.order, | |
publishedAfter=published_after | |
) | |
while request: | |
search_resp = request.execute() | |
for item in search_resp["items"]: | |
vid = item["id"]["videoId"] | |
video_ids.append(vid) | |
request = youtube.search().list_next(request, search_resp) | |
except HttpError as e: | |
handle_http_error(e) | |
else: | |
logger.error("Provide either --video, --channel, --playlist, or --search.") | |
sys.exit(1) | |
if not video_ids: | |
logger.error("No videos found to process.") | |
sys.exit(1) | |
# Print fully qualified path to output file on entry | |
full_output_path = os.path.abspath(output_file) | |
logger.info(f"Output file: {full_output_path}") | |
# 1) Open the output file for writing | |
try: | |
with open(output_file, "w", encoding="utf-8") as f: | |
# Initialize current size | |
current_size = 0 | |
if args.channel: | |
max_bytes = -1 # No size limit for channel downloads | |
else: | |
max_bytes = -1 if args.max_size == -1 else int(args.max_size * 1024 * 1024) | |
total_videos = len(video_ids) | |
processed_videos = 0 | |
for vid in video_ids: | |
# Fetch metadata | |
metadata = fetch_video_metadata_single(api_key, vid) | |
if not metadata: | |
continue # Skip if metadata not found | |
# Fetch transcript | |
transcript = fetch_transcript_single(vid) | |
metadata["transcript"] = transcript | |
# Format YAML block | |
video_block = format_video_yaml(metadata) | |
# Calculate additional bytes | |
additional_bytes = len(video_block.encode("utf-8")) | |
# Check if adding this block exceeds max size | |
if max_bytes != -1 and (current_size + additional_bytes) > max_bytes: | |
logger.info(f"Skipping video ID {vid} (would exceed max size of {args.max_size} Mb).") | |
logger.info(f"Failed max size of {args.max_size} Mb exceeded.") | |
break | |
# Write to file | |
f.write(video_block) | |
f.flush() # Ensure data is written to disk | |
# Update current size | |
current_size += additional_bytes | |
processed_videos += 1 | |
logger.info(f"Processed {processed_videos} out of {total_videos} videos.") | |
logger.info(f"Data saved to {output_file}") | |
except IOError as e: | |
logger.error(f"Failed to write to file {output_file}: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment