Created
March 20, 2025 21:59
-
-
Save cnndabbler/d8cf6bd1b24836ddc47043e5b0304cb6 to your computer and use it in GitHub Desktop.
Gradio UI as a MCP client to the transcript MCP server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import logging | |
import sys | |
import os | |
from pathlib import Path | |
from contextlib import AsyncExitStack | |
from typing import Optional, List, Dict, Any | |
import gradio as gr | |
# Import MCP SDK components | |
from mcp import ClientSession, StdioServerParameters, types | |
from mcp.client.stdio import stdio_client | |
# Configure logging | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("mcp_gradio.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger("mcp-client") | |
class MCPClient: | |
"""Client for interacting with the YouTube Transcript MCP server.""" | |
def __init__(self): | |
"""Initialize the client.""" | |
self.exit_stack = AsyncExitStack() | |
self.session: Optional[ClientSession] = None | |
self.available_tools: List[types.Tool] = [] | |
async def connect_to_server(self, server_path: str) -> bool: | |
"""Connect to an MCP server. | |
Args: | |
server_path: Path to the server script | |
Returns: | |
True if connection was successful, False otherwise | |
""" | |
try: | |
logger.info(f"Connecting to server at {server_path}") | |
# Configure server parameters | |
server_params = StdioServerParameters( | |
command=sys.executable, # Use the current Python interpreter | |
args=[str(server_path)], # Convert Path to string | |
env=None # Use the current environment | |
) | |
# Create STDIO connection | |
logger.debug("Establishing STDIO transport") | |
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) | |
read_stream, write_stream = stdio_transport | |
logger.debug("STDIO transport established") | |
# Create client session | |
logger.debug("Creating client session") | |
self.session = await self.exit_stack.enter_async_context( | |
ClientSession(read_stream, write_stream) | |
) | |
# Initialize connection | |
logger.debug("Initializing connection") | |
await self.session.initialize() | |
logger.debug("Connection initialized") | |
# List available tools | |
logger.debug("Requesting available tools") | |
response = await self.session.list_tools() | |
self.available_tools = response.tools | |
tool_names = [tool.name for tool in self.available_tools] | |
logger.info(f"Available tools: {tool_names}") | |
return True | |
except Exception as e: | |
logger.error(f"Error connecting to server: {str(e)}", exc_info=True) | |
await self.cleanup() | |
return False | |
async def cleanup(self): | |
"""Close the connection and clean up resources.""" | |
if self.exit_stack: | |
await self.exit_stack.aclose() | |
self.session = None | |
logger.info("Disconnected from server") | |
async def fetch_transcript(self, video_url: str, with_timestamps: bool = False, language: str = "en") -> str: | |
"""Get transcript for a YouTube video. | |
Args: | |
video_url: URL of the YouTube video | |
with_timestamps: Whether to include timestamps in the transcript | |
language: Language code for the transcript | |
Returns: | |
The transcript text or an error message | |
""" | |
if not self.session: | |
return "Not connected to server. Please connect first." | |
try: | |
# Select the appropriate tool based on timestamps preference | |
tool_name = "get_transcript_with_timestamps" if with_timestamps else "get_transcript" | |
# Call the tool | |
logger.info(f"Calling tool {tool_name} with video_url={video_url}, language={language}") | |
result = await self.session.call_tool( | |
tool_name, | |
{ | |
"video_url": video_url, | |
"language": language | |
} | |
) | |
# Extract text content from result | |
if result and result.content and len(result.content) > 0: | |
return result.content[0].text | |
return "No transcript content received" | |
except Exception as e: | |
logger.error(f"Error getting transcript: {str(e)}", exc_info=True) | |
return f"Error: {str(e)}" | |
# Main Gradio application | |
class TranscriptApp: | |
"""Gradio application for the YouTube Transcript client.""" | |
def __init__(self, server_path: str): | |
"""Initialize the application. | |
Args: | |
server_path: Path to the MCP server script | |
""" | |
self.server_path = server_path | |
self.client = None | |
self.loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self.loop) | |
def get_client(self) -> MCPClient: | |
"""Get or create the MCP client. | |
Returns: | |
The MCP client instance | |
""" | |
if self.client is None: | |
self.client = MCPClient() | |
return self.client | |
def connect_to_server(self) -> str: | |
"""Connect to the MCP server. | |
Returns: | |
A status message | |
""" | |
client = self.get_client() | |
try: | |
# Run the async connection function in the event loop | |
connected = self.loop.run_until_complete( | |
client.connect_to_server(self.server_path) | |
) | |
if connected: | |
tool_names = [tool.name for tool in client.available_tools] | |
return f"✅ Successfully connected to server\nAvailable tools: {tool_names}" | |
else: | |
return "❌ Failed to connect to server" | |
except Exception as e: | |
logger.error(f"Error connecting to server: {str(e)}", exc_info=True) | |
return f"❌ Error: {str(e)}" | |
def process_video(self, video_url: str, include_timestamps: bool, language: str) -> str: | |
"""Process a video URL to get its transcript. | |
Args: | |
video_url: URL of the YouTube video | |
include_timestamps: Whether to include timestamps in the transcript | |
language: Language code for the transcript | |
Returns: | |
The transcript text or an error message | |
""" | |
if not video_url or not video_url.strip(): | |
return "Please enter a YouTube URL" | |
client = self.get_client() | |
# Check if client is connected | |
if not client.session: | |
connection_result = self.connect_to_server() | |
if "Failed" in connection_result or "Error" in connection_result: | |
return connection_result | |
try: | |
# Run the async function in the event loop with a timeout | |
return self.loop.run_until_complete( | |
asyncio.wait_for( | |
client.fetch_transcript(video_url, include_timestamps, language), | |
timeout=60 # 60 second timeout | |
) | |
) | |
except asyncio.TimeoutError: | |
logger.error("Timeout occurred while waiting for transcript") | |
return "Operation timed out. The server is taking too long to respond." | |
except Exception as e: | |
logger.error(f"Error processing video: {str(e)}", exc_info=True) | |
return f"Error: {str(e)}" | |
def cleanup(self): | |
"""Clean up resources when the application is closed.""" | |
if self.client: | |
try: | |
self.loop.run_until_complete(self.client.cleanup()) | |
except Exception as e: | |
logger.error(f"Error during cleanup: {str(e)}", exc_info=True) | |
if self.loop and not self.loop.is_closed(): | |
self.loop.close() | |
def create_interface(self): | |
"""Create the Gradio interface. | |
Returns: | |
The Gradio Blocks interface | |
""" | |
with gr.Blocks(title="YouTube Transcript Extractor") as demo: | |
gr.Markdown("# YouTube Transcript Extractor") | |
gr.Markdown("Extract transcripts from YouTube videos using MCP") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
connect_button = gr.Button("Connect to Server", variant="primary") | |
video_url = gr.Textbox( | |
label="YouTube URL", | |
placeholder="https://www.youtube.com/watch?v=...", | |
info="Enter any YouTube video URL" | |
) | |
with gr.Row(): | |
include_timestamps = gr.Checkbox( | |
label="Include Timestamps", | |
value=False, | |
info="Add timestamps to the transcript" | |
) | |
language = gr.Dropdown( | |
label="Language", | |
choices=["en", "fr", "es", "de", "it", "ja", "ko", "pt", "ru", "zh-cn"], | |
value="en", | |
info="Select preferred transcript language" | |
) | |
submit_button = gr.Button("Get Transcript", variant="secondary") | |
with gr.Column(scale=2): | |
output = gr.TextArea( | |
label="Transcript", | |
placeholder="Connect to the server and enter a YouTube URL to get started...", | |
lines=20, | |
max_lines=50 | |
) | |
with gr.Accordion("How It Works", open=False): | |
gr.Markdown(""" | |
### Technical Details | |
This application uses the Model Context Protocol (MCP) to communicate with a YouTube Transcript service. | |
1. The MCP client connects to the transcript server using STDIO transport | |
2. The client lists available tools from the server | |
3. When you submit a YouTube URL, the client calls the appropriate tool | |
4. The tool extracts the transcript and returns it as formatted text | |
The MCP protocol standardizes communication between AI applications and tools, | |
making it easy to connect different services together. | |
""") | |
# Connect event handlers | |
connect_button.click( | |
fn=self.connect_to_server, | |
inputs=[], | |
outputs=output | |
) | |
submit_button.click( | |
fn=self.process_video, | |
inputs=[video_url, include_timestamps, language], | |
outputs=output | |
) | |
# Register cleanup handler | |
demo.close(self.cleanup) | |
return demo | |
def run(self): | |
"""Run the Gradio application.""" | |
# Create and launch the interface | |
demo = self.create_interface() | |
# Connect to server at startup and show the result in the console | |
connection_status = self.connect_to_server() | |
print(f"\n--- Connection Status ---\n{connection_status}\n") | |
# Launch the Gradio interface | |
demo.launch(share=False) | |
if __name__ == "__main__": | |
# Get the path to the MCP server script | |
script_dir = Path(__file__).parent | |
server_path = script_dir / "yt_dlp_transcript_server.py" | |
# Make sure server path exists | |
if not server_path.exists(): | |
print(f"ERROR: Server script not found at {server_path}") | |
print(f"Current directory: {Path.cwd()}") | |
print("Please make sure the yt_dlp_transcript_server.py file is in the same directory as this script.") | |
sys.exit(1) | |
print(f"Server script found at: {server_path}") | |
# Create and run the application | |
app = TranscriptApp(server_path) | |
try: | |
app.run() | |
finally: | |
app.cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment