Last active
April 8, 2025 05:09
-
-
Save laiso/27dd045643a7e204bd19862f14471f0a to your computer and use it in GitHub Desktop.
MDS(MPC Documents Server) Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import re | |
import urllib.request | |
import io | |
import zipfile | |
import string | |
from typing import List, Optional | |
from mcp.server.fastmcp import FastMCP | |
""" | |
# mcp.json | |
{ | |
"mcp": { | |
"servers": { | |
"react-router-docs-v7.5.0": { | |
"type": "stdio", | |
"command": "uv", | |
"args": [ | |
"run", | |
"/path/to/unified_server.py", | |
"https://github.com/remix-run/react-router/tree/15e0a5ed5cead6bd5125ea3c388d5eee9916a6bd/docs" | |
] | |
}, | |
"hono-docs": { | |
"type": "stdio", | |
"command": "uv", | |
"args": [ | |
"run", | |
"/path/to/unified_server.py", | |
"https://github.com/honojs/website/tree/main/docs" | |
] | |
} | |
} | |
} | |
} | |
""" | |
DOCS_DIR: str = os.path.expanduser("~/.mds") | |
def parse_github_url(url: str) -> tuple: | |
pattern = r"github\.com/([^/]+/[^/]+)/tree/([^/]+)(?:/(.*))?" | |
match = re.search(pattern, url) | |
if not match: | |
raise ValueError(f"Invalid GitHub URL format: {url}") | |
slug, branch, subdir = match.groups() | |
subdir = subdir if subdir else "" | |
return slug, branch, subdir | |
def checkout(url: str) -> tuple[str, str, str, str]: | |
log = "" | |
try: | |
slug, branch, subdir = parse_github_url(url) | |
except ValueError as e: | |
# Return the error message as a string to be handled by the caller tool | |
raise RuntimeError( | |
f"❌ {e}" | |
) # Use RuntimeError for unrecoverable errors during setup | |
mds_base_dir = os.path.expanduser("~/.mds") | |
if not os.path.exists(mds_base_dir): | |
os.makedirs(mds_base_dir) | |
log += f"Created directory {mds_base_dir}\n" | |
sanitized_slug = slug.replace("/", "__") | |
sanitized_branch = branch.replace("/", "__") | |
subdir_base = os.path.basename(subdir) if subdir else "root" | |
sanitized_subdir = subdir_base.replace("/", "__") | |
repo_name = os.path.join( | |
mds_base_dir, f"{sanitized_slug}__{sanitized_branch}__{sanitized_subdir}" | |
) | |
if os.path.exists(repo_name): | |
log += f"⚠ Skipping {repo_name}: directory already exists.\n" | |
print(log, file=sys.stderr) # Print log to stderr for info | |
return slug, branch, subdir, repo_name | |
if len(branch) == 40 and all(c in string.hexdigits for c in branch): | |
zip_url = f"https://github.com/{slug}/archive/{branch}.zip" | |
else: | |
zip_url = f"https://github.com/{slug}/archive/refs/heads/{branch}.zip" | |
log += f"🔍 Downloading ZIP from {zip_url}\n" | |
try: | |
with urllib.request.urlopen(zip_url) as response: | |
if response.status != 200: | |
raise RuntimeError(f"Failed to download ZIP: HTTP {response.status}") | |
data = response.read() | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to download ZIP: {e}") | |
log += f"📦 Extracting ZIP contents...\n" | |
try: | |
with zipfile.ZipFile(io.BytesIO(data)) as z: | |
names = z.namelist() | |
if not names: | |
raise RuntimeError("❌ ZIP file is empty.") | |
common_prefix = os.path.commonprefix(names) | |
if not common_prefix.endswith("/"): | |
separator_index = common_prefix.find("/") | |
if separator_index != -1: | |
common_prefix = common_prefix[: separator_index + 1] | |
else: | |
if names: | |
first_part = names[0].split("/")[0] | |
if all( | |
name.startswith(first_part + "/") | |
for name in names | |
if "/" in name | |
): | |
common_prefix = first_part + "/" | |
else: # Fallback if structure is unexpected | |
common_prefix = "" | |
else: # Should not happen if names is not empty check passed | |
common_prefix = "" | |
subdir_clean = subdir.strip("/") | |
if subdir_clean: | |
desired_prefix = f"{common_prefix}{subdir_clean}/" | |
else: | |
desired_prefix = ( | |
common_prefix # Extract everything from the root dir inside zip | |
) | |
extracted = False | |
for member in names: | |
if member.startswith(desired_prefix): | |
rel_path = member[len(desired_prefix) :] | |
if ( | |
not rel_path | |
): # Skip the directory entry itself if rel_path is empty | |
continue | |
target_path = os.path.join(repo_name, rel_path) | |
if member.endswith("/"): | |
os.makedirs(target_path, exist_ok=True) | |
else: | |
os.makedirs(os.path.dirname(target_path), exist_ok=True) | |
with open(target_path, "wb") as f: | |
f.write(z.read(member)) | |
extracted = True | |
if ( | |
not extracted and subdir_clean | |
): # Check extracted only if a specific subdir was requested | |
# Clean up potentially created (but empty) repo_name dir | |
if os.path.exists(repo_name): | |
try: | |
os.rmdir(repo_name) # Only removes if empty | |
except OSError: | |
pass | |
raise RuntimeError( | |
f"⚠ Directory not found in ZIP: '{subdir_clean}' under prefix '{common_prefix}'" | |
) | |
elif ( | |
not extracted and not subdir_clean | |
): # Check if root extraction yielded nothing | |
raise RuntimeError( | |
"❌ Failed to extract any files from the repository root." | |
) | |
except zipfile.BadZipFile: | |
raise RuntimeError( | |
"❌ Failed to extract ZIP: File is corrupted or not a ZIP file." | |
) | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to extract ZIP: {e}") | |
log += f"✅ Successfully checked out to {repo_name}\n" | |
print(log, file=sys.stderr) # Print final log to stderr | |
if not os.path.exists(repo_name) or not os.listdir(repo_name): | |
raise RuntimeError( | |
f"❌ Failed to extract '{repo_name}'. Directory is missing or empty after extraction attempt." | |
) | |
return slug, branch, subdir, repo_name | |
def is_plain_text_file(filepath: str) -> bool: | |
try: | |
with open(filepath, "r", encoding="utf-8") as f: | |
f.read(1024) # Read up to 1KB | |
return True | |
except UnicodeDecodeError: | |
return False | |
except Exception: | |
return False | |
def search_docs_files(query: str, current_dir: Optional[str] = None) -> List[str]: | |
if current_dir is None: | |
current_dir = DOCS_DIR | |
if not os.path.isdir(current_dir): | |
if current_dir == DOCS_DIR: | |
print( | |
f"Warning: Base directory {DOCS_DIR} does not exist. No files to search.", | |
file=sys.stderr, | |
) | |
return [] | |
return [] | |
results: List[str] = [] | |
try: | |
with os.scandir(current_dir) as entries: | |
for entry in entries: | |
full_path = os.path.join(current_dir, entry.name) | |
try: | |
if entry.is_dir( | |
follow_symlinks=False | |
): # Avoid infinite loops with symlinks | |
results.extend(search_docs_files(query, full_path)) | |
elif entry.is_file(follow_symlinks=False): | |
if re.search( | |
re.escape(query), entry.name, re.IGNORECASE | |
) and is_plain_text_file(full_path): | |
results.append(full_path) | |
except OSError as e: | |
print(f"Skipping {full_path}: Cannot access ({e})", file=sys.stderr) | |
except OSError as e: | |
raise Exception(f"Failed to read directory {current_dir}: {e}") | |
return results | |
# --- Tool Definitions --- | |
def docs_search_tool(query: str, repo_dir: str = None) -> str: | |
try: | |
# Search within the specific repository directory if provided, otherwise fall back to DOCS_DIR | |
search_dir = repo_dir if repo_dir else DOCS_DIR | |
results = search_docs_files(query, search_dir) | |
return "\n".join(results) if results else "No matching files found." | |
except Exception as e: | |
return f"An error occurred: {e}" | |
def run_checkout(url: str) -> str: | |
try: | |
slug, branch, subdir, repo_path = checkout(url) | |
return f"Checkout successful: {slug} (Branch: {branch}, Subdir: {subdir if subdir else '/'}) -> {repo_path}" | |
except RuntimeError as e: # Catch RuntimeError raised by our checkout function | |
return str(e) # Return the error message from checkout | |
except Exception as e: # Catch any other unexpected errors | |
return f"An unexpected error occurred: {e}" | |
def read_file_tool(path: str) -> str: | |
try: | |
abs_path = os.path.abspath(path) | |
if not abs_path.startswith(os.path.abspath(DOCS_DIR)): | |
return f"Error: The specified path is outside the allowed directory ({DOCS_DIR})." | |
if not os.path.isfile(abs_path): | |
return "Error: The specified path is not a file." | |
with open(abs_path, "r", encoding="utf-8") as file: | |
content = file.read() | |
return content | |
except FileNotFoundError: | |
return "File not found." | |
except UnicodeDecodeError: | |
return ( | |
"An error occurred while reading the file: The file is not UTF-8 encoded." | |
) | |
except Exception as e: | |
return f"An error occurred while reading the file: {e}" | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
print(f"Usage: python {sys.argv[0]} <GitHub Repository URL>") | |
print( | |
"Example: python unified_server.py https://github.com/owner/repo/tree/main/docs" | |
) | |
sys.exit(1) | |
url = sys.argv[1] | |
try: | |
# 1. Perform the initial checkout specified by the command line argument | |
print(f"--- Initial Checkout for {url} ---") | |
slug, branch, subdir, repo_path = checkout(url) | |
print(f"--- Checkout completed for {slug} to {repo_path} ---") | |
sanitized_slug = slug.replace("/", "_") | |
sanitized_branch = branch.replace("/", "_") | |
subdir_base = os.path.basename(subdir) if subdir else "root" | |
sanitized_subdir = subdir_base.replace("/", "_") if subdir else "" # Handle empty subdir | |
tool_name = f"mds_search_{sanitized_slug}" | |
tool_description = f""" | |
Search for files containing <query> in their filename within the specific repository's documents: {repo_path} | |
and return the list of their paths. | |
""" | |
# Create a closure that pre-fills the repo_dir argument | |
def specific_docs_search_tool(query: str) -> str: | |
return docs_search_tool(query, repo_path) | |
mcp = FastMCP("MDS(MPC Documents Server) Server") | |
mcp.add_tool(specific_docs_search_tool, tool_name, tool_description) | |
print(f"--- Added tool '{tool_name}' ---") | |
mcp.add_tool( | |
read_file_tool, | |
f"mds_read_{sanitized_slug}", | |
"Returns the contents of the file at the specified path.", | |
) | |
print("--- Added tool 'read_file' ---") | |
# 3. Start the main server | |
print("--- Starting MCP Server ---") | |
mcp.run() | |
except RuntimeError as e: | |
print( | |
f"\n❌ An error occurred during initialization or checkout: {e}", | |
file=sys.stderr, | |
) | |
sys.exit(1) | |
except Exception as e: | |
print(f"\n❌ An unexpected error occurred: {e}", file=sys.stderr) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment