Created
July 1, 2025 02:12
-
-
Save spdin/c4b0365c7298481a8e676992036613d5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import sys | |
import re | |
from pathlib import Path | |
from urllib.parse import urlparse, parse_qs | |
import aiohttp | |
import aiofiles | |
from typing import Any, Sequence | |
import logging | |
# Add the mcp package to Python path - adjust as needed | |
sys.path.append(os.path.join(os.path.dirname(__file__), ".")) | |
from mcp.server.models import InitializationOptions | |
from mcp.server import NotificationOptions, Server | |
from mcp.server.stdio import stdio_server | |
from mcp.types import ( | |
CallToolRequest, | |
CallToolResult, | |
ListToolsRequest, | |
TextContent, | |
Tool, | |
) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger("gdrive-download-server") | |
# Create the MCP server instance | |
app = Server("gdrive-download-server") | |
# Ensure downloads directory exists | |
DOWNLOADS_DIR = Path.home() / "downloads" | |
DOWNLOADS_DIR.mkdir(exist_ok=True) | |
def extract_google_drive_file_id(url: str) -> str: | |
""" | |
Extract Google Drive file ID from various Google Drive URL formats. | |
Supported formats: | |
- https://drive.google.com/file/d/FILE_ID/view?usp=sharing | |
- https://drive.google.com/open?id=FILE_ID | |
- https://drive.google.com/uc?id=FILE_ID | |
""" | |
# Pattern for /file/d/FILE_ID/view format | |
match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
if match: | |
return match.group(1) | |
# Pattern for ?id=FILE_ID format | |
parsed_url = urlparse(url) | |
query_params = parse_qs(parsed_url.query) | |
if 'id' in query_params: | |
return query_params['id'][0] | |
raise ValueError(f"Could not extract Google Drive file ID from URL: {url}") | |
def get_google_drive_download_url(file_id: str) -> str: | |
"""Convert Google Drive file ID to direct download URL.""" | |
return f"https://drive.google.com/uc?export=download&id={file_id}" | |
def get_google_drive_confirm_url(file_id: str, confirm_token: str) -> str: | |
"""Get Google Drive download URL with confirmation token for large files.""" | |
return f"https://drive.google.com/uc?export=download&confirm={confirm_token}&id={file_id}" | |
async def get_filename_from_response_headers(response) -> str: | |
"""Extract filename from response headers.""" | |
content_disposition = response.headers.get('Content-Disposition', '') | |
if content_disposition: | |
# Try to extract filename from Content-Disposition header | |
match = re.search(r'filename[*]?=["\']?([^"\';\n]+)', content_disposition) | |
if match: | |
return match.group(1).strip('"\'') | |
return None | |
async def download_google_drive_file(url: str, filename: str = None) -> tuple[bool, str, str]: | |
""" | |
Download file from Google Drive sharing URL. | |
Returns (success, message, filepath) | |
""" | |
try: | |
# Extract file ID from the Google Drive URL | |
file_id = extract_google_drive_file_id(url) | |
logger.info(f"Extracted Google Drive file ID: {file_id}") | |
# Get the direct download URL | |
download_url = get_google_drive_download_url(file_id) | |
async with aiohttp.ClientSession() as session: | |
logger.info(f"Starting Google Drive download from file ID: {file_id}") | |
# First request to get the file or confirmation page | |
async with session.get(download_url) as response: | |
if response.status != 200: | |
return False, f"HTTP {response.status}: Failed to access Google Drive file {file_id}", "" | |
# Check if this is actually a direct download (binary content) | |
content_type = response.headers.get('Content-Type', '').lower() | |
if 'text/html' in content_type: | |
# This is likely a confirmation page | |
content = await response.text() | |
# Check if this is a confirmation page (for large files) | |
confirm_match = re.search(r'confirm=([a-zA-Z0-9_-]+)', content) | |
if confirm_match or 'download_warning' in content: | |
logger.info("Large file detected, handling confirmation...") | |
# Try to extract confirmation token | |
if confirm_match: | |
confirm_token = confirm_match.group(1) | |
else: | |
# Look for form-based confirmation | |
form_match = re.search(r'name="confirm" value="([^"]+)"', content) | |
if form_match: | |
confirm_token = form_match.group(1) | |
else: | |
# Fallback: try common confirmation tokens | |
confirm_token = "t" | |
# Make second request with confirmation | |
confirm_url = get_google_drive_confirm_url(file_id, confirm_token) | |
async with session.get(confirm_url) as confirm_response: | |
if confirm_response.status != 200: | |
return False, f"HTTP {confirm_response.status}: Failed to confirm Google Drive download", "" | |
response = confirm_response | |
else: | |
# HTML response but not a confirmation page - might be an error | |
if 'Sorry, you can\'t view or download this file at this time' in content: | |
return False, "File is not publicly accessible or has been removed", "" | |
elif 'Google Drive - Virus scan warning' in content: | |
# Try to find the download anyway link | |
download_anyway_match = re.search(r'href="([^"]*&confirm=t[^"]*)"', content) | |
if download_anyway_match: | |
confirm_url = "https://drive.google.com" + download_anyway_match.group(1).replace('&', '&') | |
async with session.get(confirm_url) as confirm_response: | |
response = confirm_response | |
else: | |
return False, "Unable to bypass virus scan warning", "" | |
else: | |
return False, "Unexpected HTML response from Google Drive", "" | |
# Try to get filename from headers | |
if not filename: | |
filename = await get_filename_from_response_headers(response) | |
if not filename: | |
filename = f"gdrive_file_{file_id}" | |
filepath = DOWNLOADS_DIR / filename | |
# Check if file already exists | |
# if filepath.exists(): | |
# return False, f"File {filename} already exists in downloads directory", str(filepath) | |
# Get content length for progress tracking | |
content_length = response.headers.get('Content-Length') | |
if content_length: | |
total_size = int(content_length) | |
logger.info(f"Downloading {filename} ({total_size} bytes)") | |
else: | |
logger.info(f"Downloading {filename} (size unknown)") | |
# Download and save file | |
async with aiofiles.open(filepath, 'wb') as f: | |
downloaded = 0 | |
async for chunk in response.content.iter_chunked(8192): | |
await f.write(chunk) | |
downloaded += len(chunk) | |
# Log progress for large files | |
if content_length and downloaded % (1024 * 1024) == 0: # Every MB | |
progress = (downloaded / total_size) * 100 | |
logger.info(f"Download progress: {progress:.1f}%") | |
file_size = filepath.stat().st_size | |
logger.info(f"Successfully downloaded {filename} ({file_size} bytes)") | |
return True, f"Successfully downloaded {filename} ({file_size} bytes)", str(filepath) | |
except ValueError as e: | |
logger.error(f"Invalid Google Drive URL: {e}") | |
return False, f"Invalid Google Drive URL: {str(e)}", "" | |
except aiohttp.ClientError as e: | |
logger.error(f"Network error downloading from Google Drive: {e}") | |
return False, f"Network error: {str(e)}", "" | |
except Exception as e: | |
logger.error(f"Error downloading from Google Drive: {e}") | |
return False, f"Download failed: {str(e)}", "" | |
@app.list_tools() | |
async def handle_list_tools() -> list[Tool]: | |
"""List available tools for the Google Drive download server.""" | |
return [ | |
Tool( | |
name="download_gdrive_file", | |
description="Download a file from a Google Drive sharing URL", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"url": { | |
"type": "string", | |
"description": "The Google Drive sharing URL (e.g., https://drive.google.com/file/d/FILE_ID/view?usp=sharing)" | |
}, | |
"filename": { | |
"type": "string", | |
"description": "Optional custom filename. If not provided, will try to extract from Google Drive response", | |
"default": None | |
} | |
}, | |
"required": ["url"] | |
} | |
), | |
Tool( | |
name="extract_gdrive_file_id", | |
description="Extract the file ID from a Google Drive URL without downloading", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"url": { | |
"type": "string", | |
"description": "The Google Drive URL to extract file ID from" | |
} | |
}, | |
"required": ["url"] | |
} | |
), | |
Tool( | |
name="list_downloads", | |
description="List all files in the downloads directory", | |
inputSchema={ | |
"type": "object", | |
"properties": {}, | |
"additionalProperties": False | |
} | |
), | |
Tool( | |
name="delete_download", | |
description="Delete a specific file from the downloads directory", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"filename": { | |
"type": "string", | |
"description": "The name of the file to delete from downloads directory" | |
} | |
}, | |
"required": ["filename"] | |
} | |
) | |
] | |
@app.call_tool() | |
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: | |
"""Handle tool calls for the Google Drive download server.""" | |
if name == "download_gdrive_file": | |
url = arguments.get("url") | |
filename = arguments.get("filename") | |
if not url: | |
return [TextContent(type="text", text="Error: URL is required")] | |
# Validate that this is a Google Drive URL | |
if "drive.google.com" not in url: | |
return [TextContent(type="text", text="Error: This tool only supports Google Drive URLs")] | |
success, message, filepath = await download_google_drive_file(url, filename) | |
if success: | |
return [TextContent( | |
type="text", | |
text=f"β {message}\nSaved to: {filepath}" | |
)] | |
else: | |
return [TextContent( | |
type="text", | |
text=f"β {message}" | |
)] | |
elif name == "extract_gdrive_file_id": | |
url = arguments.get("url") | |
if not url: | |
return [TextContent(type="text", text="Error: URL is required")] | |
try: | |
file_id = extract_google_drive_file_id(url) | |
download_url = get_google_drive_download_url(file_id) | |
return [TextContent( | |
type="text", | |
text=f"β Google Drive File ID: {file_id}\nπ₯ Direct download URL: {download_url}" | |
)] | |
except ValueError as e: | |
return [TextContent(type="text", text=f"β {str(e)}")] | |
elif name == "list_downloads": | |
try: | |
if not DOWNLOADS_DIR.exists(): | |
return [TextContent(type="text", text="Downloads directory does not exist")] | |
files = [] | |
for file_path in DOWNLOADS_DIR.iterdir(): | |
if file_path.is_file(): | |
size = file_path.stat().st_size | |
size_mb = size / (1024 * 1024) | |
files.append(f"π {file_path.name} ({size_mb:.2f} MB)") | |
if not files: | |
return [TextContent(type="text", text="No files in downloads directory")] | |
file_list = "\n".join(files) | |
return [TextContent( | |
type="text", | |
text=f"Files in downloads directory:\n{file_list}" | |
)] | |
except Exception as e: | |
return [TextContent(type="text", text=f"Error listing downloads: {str(e)}")] | |
elif name == "delete_download": | |
filename = arguments.get("filename") | |
if not filename: | |
return [TextContent(type="text", text="Error: filename is required")] | |
try: | |
filepath = DOWNLOADS_DIR / filename | |
if not filepath.exists(): | |
return [TextContent(type="text", text=f"File {filename} not found in downloads directory")] | |
filepath.unlink() | |
return [TextContent(type="text", text=f"β Successfully deleted {filename}")] | |
except Exception as e: | |
return [TextContent(type="text", text=f"Error deleting {filename}: {str(e)}")] | |
else: | |
return [TextContent(type="text", text=f"Unknown tool: {name}")] | |
async def main(): | |
"""Main function to run the MCP server.""" | |
# Run the server using stdio transport | |
async with stdio_server() as (read_stream, write_stream): | |
await app.run( | |
read_stream, | |
write_stream, | |
InitializationOptions( | |
server_name="gdrive-download-server", | |
server_version="0.1.0", | |
capabilities=app.get_capabilities( | |
notification_options=NotificationOptions(), | |
experimental_capabilities={}, | |
), | |
), | |
) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment