Created
March 20, 2025 21:56
-
-
Save cnndabbler/0923c3a7a612b70e37358fa78427074c to your computer and use it in GitHub Desktop.
MCP server reference code used to retrieve YouTube videos transcripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from mcp.server import Server, NotificationOptions | |
from mcp.server.models import InitializationOptions | |
from mcp.server.fastmcp import FastMCP | |
import mcp.server.stdio | |
import mcp.types as types | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from youtube_transcript_api._errors import NoTranscriptFound | |
from urllib.parse import urlparse, parse_qs | |
import logging | |
# Set up simple logging | |
logging.basicConfig( | |
filename='yt_transcript.log', | |
level=logging.DEBUG, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
# Create server instance | |
mcp = FastMCP("youtube-transcript") | |
def extract_video_id(url: str) -> str: | |
"""Extract video ID from various forms of YouTube URLs.""" | |
parsed = urlparse(url) | |
if parsed.hostname in ('youtu.be', 'www.youtu.be'): | |
return parsed.path[1:] | |
if parsed.hostname in ('youtube.com', 'www.youtube.com'): | |
if parsed.path == '/watch': | |
return parse_qs(parsed.query)['v'][0] | |
elif parsed.path.startswith('/v/'): | |
return parsed.path[3:] | |
elif parsed.path.startswith('/shorts/'): | |
return parsed.path[8:] | |
raise ValueError("Could not extract video ID from URL") | |
def get_transcript_helper(video_id: str, with_timestamps: bool = False, language: str = "en") -> str: | |
"""Get transcript for a video ID and format it as readable text.""" | |
logging.debug(f"Getting transcript for video ID: {video_id}") | |
try: | |
available_transcripts = YouTubeTranscriptApi.list_transcripts(video_id) | |
try: | |
transcript = available_transcripts.find_transcript([language]) | |
except NoTranscriptFound: | |
# If requested language not found, try English or take first available | |
try: | |
transcript = available_transcripts.find_transcript(['en']) | |
except NoTranscriptFound: | |
# Take first available transcript if no English found | |
for t in available_transcripts: | |
transcript = t | |
break | |
else: | |
return f"No transcript found for video {video_id}" | |
# Get the transcript data | |
transcript_data = transcript.fetch() | |
# Log some basic info about what we got | |
logging.debug(f"Transcript data type: {type(transcript_data)}") | |
if transcript_data: | |
logging.debug(f"First entry type: {type(transcript_data[0])}") | |
except Exception as e: | |
logging.error(f"Error getting transcript: {str(e)}") | |
return f"Error getting transcript: {str(e)}" | |
try: | |
# Format the transcript with or without timestamps | |
if with_timestamps: | |
def format_timestamp(seconds: float) -> str: | |
hours = int(seconds // 3600) | |
minutes = int((seconds % 3600) // 60) | |
secs = int(seconds % 60) | |
if hours > 0: | |
return f"[{hours}:{minutes:02d}:{secs:02d}]" | |
return f"[{minutes}:{secs:02d}]" | |
# Process each entry - try to be flexible with how we access the data | |
result = [] | |
for entry in transcript_data: | |
try: | |
# Try to access as object attributes first | |
start = getattr(entry, 'start', None) | |
text = getattr(entry, 'text', None) | |
# If that doesn't work, try dictionary-like access | |
if start is None or text is None: | |
try: | |
start = entry['start'] | |
text = entry['text'] | |
except (TypeError, KeyError): | |
# Last resort - convert to string | |
logging.warning(f"Could not extract start/text from entry: {entry}") | |
text = str(entry) | |
start = 0 | |
timestamp = format_timestamp(start) | |
result.append(f"{timestamp} {text}") | |
except Exception as e: | |
logging.error(f"Error processing entry: {str(e)}") | |
result.append("[ERROR] Could not process entry") | |
return "\n".join(result) | |
else: | |
# Similar approach for non-timestamp version | |
result = [] | |
for entry in transcript_data: | |
try: | |
# Try to access as object attribute first | |
text = getattr(entry, 'text', None) | |
# If that doesn't work, try dictionary-like access | |
if text is None: | |
try: | |
text = entry['text'] | |
except (TypeError, KeyError): | |
# Last resort - convert to string | |
text = str(entry) | |
result.append(text) | |
except Exception as e: | |
logging.error(f"Error extracting text: {str(e)}") | |
result.append("[ERROR] Could not process entry") | |
return "\n".join(result) | |
except Exception as e: | |
logging.error(f"Error formatting transcript: {str(e)}") | |
return f"Error formatting transcript: {str(e)}" | |
@mcp.tool() | |
async def handle_list_tools() -> types.ListToolsResult: | |
return [ | |
types.Tool( | |
name="get_transcript", | |
description="Get transcript from YouTube videos", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"video_url": {"type": "string", "description": "YouTube video URL"}, | |
"language": {"type": "string", "description": "Language code (e.g. 'en', 'fr')", "default": "en"} | |
}, | |
"required": ["video_url"] | |
} | |
), | |
types.Tool( | |
name="get_transcript_with_timestamps", | |
description="Get transcript from YouTube videos with timestamps", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"video_url": {"type": "string", "description": "YouTube video URL"}, | |
"language": {"type": "string", "description": "Language code (e.g. 'en', 'fr')", "default": "en"} | |
}, | |
"required": ["video_url"] | |
} | |
) | |
] | |
@mcp.tool() | |
async def get_transcript(video_url: str, language: str = "en") -> list[types.TextContent]: | |
logging.debug(f"get_transcript called with video_url: {video_url}") | |
if not video_url: | |
return [types.TextContent(type="text", text="No video URL provided")] | |
try: | |
video_id = extract_video_id(video_url) | |
logging.debug(f"Extracted video ID: {video_id}") | |
except ValueError as e: | |
return [types.TextContent(type="text", text=f"Could not extract video ID from URL: {video_url}\nError: {str(e)}")] | |
transcript_text = get_transcript_helper(video_id, False, language) | |
return [ | |
types.TextContent( | |
type="text", | |
text=transcript_text | |
) | |
] | |
@mcp.tool() | |
async def get_transcript_with_timestamps(video_url: str, language: str = "en") -> list[types.TextContent]: | |
logging.debug(f"get_transcript_with_timestamps called with video_url: {video_url}") | |
if not video_url: | |
return [types.TextContent(type="text", text="No video URL provided")] | |
try: | |
video_id = extract_video_id(video_url) | |
logging.debug(f"Extracted video ID: {video_id}") | |
except ValueError as e: | |
return [types.TextContent(type="text", text=f"Could not extract video ID from URL: {video_url}\nError: {str(e)}")] | |
transcript_text = get_transcript_helper(video_id, True, language) | |
return [ | |
types.TextContent( | |
type="text", | |
text=transcript_text | |
) | |
] | |
if __name__ == "__main__": | |
mcp.run(transport="stdio") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment