Skip to content

Instantly share code, notes, and snippets.

@spdin
Created July 1, 2025 02:12
Show Gist options
  • Save spdin/c4b0365c7298481a8e676992036613d5 to your computer and use it in GitHub Desktop.
Save spdin/c4b0365c7298481a8e676992036613d5 to your computer and use it in GitHub Desktop.
import asyncio
import os
import sys
import re
from pathlib import Path
from urllib.parse import urlparse, parse_qs
import aiohttp
import aiofiles
from typing import Any, Sequence
import logging
# Add the mcp package to Python path - adjust as needed
sys.path.append(os.path.join(os.path.dirname(__file__), "."))
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
TextContent,
Tool,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gdrive-download-server")
# Create the MCP server instance
app = Server("gdrive-download-server")
# Ensure downloads directory exists
DOWNLOADS_DIR = Path.home() / "downloads"
DOWNLOADS_DIR.mkdir(exist_ok=True)
def extract_google_drive_file_id(url: str) -> str:
"""
Extract Google Drive file ID from various Google Drive URL formats.
Supported formats:
- https://drive.google.com/file/d/FILE_ID/view?usp=sharing
- https://drive.google.com/open?id=FILE_ID
- https://drive.google.com/uc?id=FILE_ID
"""
# Pattern for /file/d/FILE_ID/view format
match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url)
if match:
return match.group(1)
# Pattern for ?id=FILE_ID format
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if 'id' in query_params:
return query_params['id'][0]
raise ValueError(f"Could not extract Google Drive file ID from URL: {url}")
def get_google_drive_download_url(file_id: str) -> str:
"""Convert Google Drive file ID to direct download URL."""
return f"https://drive.google.com/uc?export=download&id={file_id}"
def get_google_drive_confirm_url(file_id: str, confirm_token: str) -> str:
"""Get Google Drive download URL with confirmation token for large files."""
return f"https://drive.google.com/uc?export=download&confirm={confirm_token}&id={file_id}"
async def get_filename_from_response_headers(response) -> str:
"""Extract filename from response headers."""
content_disposition = response.headers.get('Content-Disposition', '')
if content_disposition:
# Try to extract filename from Content-Disposition header
match = re.search(r'filename[*]?=["\']?([^"\';\n]+)', content_disposition)
if match:
return match.group(1).strip('"\'')
return None
async def download_google_drive_file(url: str, filename: str = None) -> tuple[bool, str, str]:
"""
Download file from Google Drive sharing URL.
Returns (success, message, filepath)
"""
try:
# Extract file ID from the Google Drive URL
file_id = extract_google_drive_file_id(url)
logger.info(f"Extracted Google Drive file ID: {file_id}")
# Get the direct download URL
download_url = get_google_drive_download_url(file_id)
async with aiohttp.ClientSession() as session:
logger.info(f"Starting Google Drive download from file ID: {file_id}")
# First request to get the file or confirmation page
async with session.get(download_url) as response:
if response.status != 200:
return False, f"HTTP {response.status}: Failed to access Google Drive file {file_id}", ""
# Check if this is actually a direct download (binary content)
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' in content_type:
# This is likely a confirmation page
content = await response.text()
# Check if this is a confirmation page (for large files)
confirm_match = re.search(r'confirm=([a-zA-Z0-9_-]+)', content)
if confirm_match or 'download_warning' in content:
logger.info("Large file detected, handling confirmation...")
# Try to extract confirmation token
if confirm_match:
confirm_token = confirm_match.group(1)
else:
# Look for form-based confirmation
form_match = re.search(r'name="confirm" value="([^"]+)"', content)
if form_match:
confirm_token = form_match.group(1)
else:
# Fallback: try common confirmation tokens
confirm_token = "t"
# Make second request with confirmation
confirm_url = get_google_drive_confirm_url(file_id, confirm_token)
async with session.get(confirm_url) as confirm_response:
if confirm_response.status != 200:
return False, f"HTTP {confirm_response.status}: Failed to confirm Google Drive download", ""
response = confirm_response
else:
# HTML response but not a confirmation page - might be an error
if 'Sorry, you can\'t view or download this file at this time' in content:
return False, "File is not publicly accessible or has been removed", ""
elif 'Google Drive - Virus scan warning' in content:
# Try to find the download anyway link
download_anyway_match = re.search(r'href="([^"]*&confirm=t[^"]*)"', content)
if download_anyway_match:
confirm_url = "https://drive.google.com" + download_anyway_match.group(1).replace('&', '&')
async with session.get(confirm_url) as confirm_response:
response = confirm_response
else:
return False, "Unable to bypass virus scan warning", ""
else:
return False, "Unexpected HTML response from Google Drive", ""
# Try to get filename from headers
if not filename:
filename = await get_filename_from_response_headers(response)
if not filename:
filename = f"gdrive_file_{file_id}"
filepath = DOWNLOADS_DIR / filename
# Check if file already exists
# if filepath.exists():
# return False, f"File {filename} already exists in downloads directory", str(filepath)
# Get content length for progress tracking
content_length = response.headers.get('Content-Length')
if content_length:
total_size = int(content_length)
logger.info(f"Downloading {filename} ({total_size} bytes)")
else:
logger.info(f"Downloading {filename} (size unknown)")
# Download and save file
async with aiofiles.open(filepath, 'wb') as f:
downloaded = 0
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
downloaded += len(chunk)
# Log progress for large files
if content_length and downloaded % (1024 * 1024) == 0: # Every MB
progress = (downloaded / total_size) * 100
logger.info(f"Download progress: {progress:.1f}%")
file_size = filepath.stat().st_size
logger.info(f"Successfully downloaded {filename} ({file_size} bytes)")
return True, f"Successfully downloaded {filename} ({file_size} bytes)", str(filepath)
except ValueError as e:
logger.error(f"Invalid Google Drive URL: {e}")
return False, f"Invalid Google Drive URL: {str(e)}", ""
except aiohttp.ClientError as e:
logger.error(f"Network error downloading from Google Drive: {e}")
return False, f"Network error: {str(e)}", ""
except Exception as e:
logger.error(f"Error downloading from Google Drive: {e}")
return False, f"Download failed: {str(e)}", ""
@app.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available tools for the Google Drive download server."""
return [
Tool(
name="download_gdrive_file",
description="Download a file from a Google Drive sharing URL",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The Google Drive sharing URL (e.g., https://drive.google.com/file/d/FILE_ID/view?usp=sharing)"
},
"filename": {
"type": "string",
"description": "Optional custom filename. If not provided, will try to extract from Google Drive response",
"default": None
}
},
"required": ["url"]
}
),
Tool(
name="extract_gdrive_file_id",
description="Extract the file ID from a Google Drive URL without downloading",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The Google Drive URL to extract file ID from"
}
},
"required": ["url"]
}
),
Tool(
name="list_downloads",
description="List all files in the downloads directory",
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False
}
),
Tool(
name="delete_download",
description="Delete a specific file from the downloads directory",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The name of the file to delete from downloads directory"
}
},
"required": ["filename"]
}
)
]
@app.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls for the Google Drive download server."""
if name == "download_gdrive_file":
url = arguments.get("url")
filename = arguments.get("filename")
if not url:
return [TextContent(type="text", text="Error: URL is required")]
# Validate that this is a Google Drive URL
if "drive.google.com" not in url:
return [TextContent(type="text", text="Error: This tool only supports Google Drive URLs")]
success, message, filepath = await download_google_drive_file(url, filename)
if success:
return [TextContent(
type="text",
text=f"βœ… {message}\nSaved to: {filepath}"
)]
else:
return [TextContent(
type="text",
text=f"❌ {message}"
)]
elif name == "extract_gdrive_file_id":
url = arguments.get("url")
if not url:
return [TextContent(type="text", text="Error: URL is required")]
try:
file_id = extract_google_drive_file_id(url)
download_url = get_google_drive_download_url(file_id)
return [TextContent(
type="text",
text=f"βœ… Google Drive File ID: {file_id}\nπŸ“₯ Direct download URL: {download_url}"
)]
except ValueError as e:
return [TextContent(type="text", text=f"❌ {str(e)}")]
elif name == "list_downloads":
try:
if not DOWNLOADS_DIR.exists():
return [TextContent(type="text", text="Downloads directory does not exist")]
files = []
for file_path in DOWNLOADS_DIR.iterdir():
if file_path.is_file():
size = file_path.stat().st_size
size_mb = size / (1024 * 1024)
files.append(f"πŸ“„ {file_path.name} ({size_mb:.2f} MB)")
if not files:
return [TextContent(type="text", text="No files in downloads directory")]
file_list = "\n".join(files)
return [TextContent(
type="text",
text=f"Files in downloads directory:\n{file_list}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error listing downloads: {str(e)}")]
elif name == "delete_download":
filename = arguments.get("filename")
if not filename:
return [TextContent(type="text", text="Error: filename is required")]
try:
filepath = DOWNLOADS_DIR / filename
if not filepath.exists():
return [TextContent(type="text", text=f"File {filename} not found in downloads directory")]
filepath.unlink()
return [TextContent(type="text", text=f"βœ… Successfully deleted {filename}")]
except Exception as e:
return [TextContent(type="text", text=f"Error deleting {filename}: {str(e)}")]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
"""Main function to run the MCP server."""
# Run the server using stdio transport
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="gdrive-download-server",
server_version="0.1.0",
capabilities=app.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment