Skip to content

Instantly share code, notes, and snippets.

@laiso
Last active April 8, 2025 05:09
Show Gist options
  • Save laiso/27dd045643a7e204bd19862f14471f0a to your computer and use it in GitHub Desktop.
Save laiso/27dd045643a7e204bd19862f14471f0a to your computer and use it in GitHub Desktop.
MDS(MPC Documents Server) Server
import os
import sys
import re
import urllib.request
import io
import zipfile
import string
from typing import List, Optional
from mcp.server.fastmcp import FastMCP
"""
# mcp.json
{
"mcp": {
"servers": {
"react-router-docs-v7.5.0": {
"type": "stdio",
"command": "uv",
"args": [
"run",
"/path/to/unified_server.py",
"https://github.com/remix-run/react-router/tree/15e0a5ed5cead6bd5125ea3c388d5eee9916a6bd/docs"
]
},
"hono-docs": {
"type": "stdio",
"command": "uv",
"args": [
"run",
"/path/to/unified_server.py",
"https://github.com/honojs/website/tree/main/docs"
]
}
}
}
}
"""
DOCS_DIR: str = os.path.expanduser("~/.mds")
def parse_github_url(url: str) -> tuple:
pattern = r"github\.com/([^/]+/[^/]+)/tree/([^/]+)(?:/(.*))?"
match = re.search(pattern, url)
if not match:
raise ValueError(f"Invalid GitHub URL format: {url}")
slug, branch, subdir = match.groups()
subdir = subdir if subdir else ""
return slug, branch, subdir
def checkout(url: str) -> tuple[str, str, str, str]:
log = ""
try:
slug, branch, subdir = parse_github_url(url)
except ValueError as e:
# Return the error message as a string to be handled by the caller tool
raise RuntimeError(
f"❌ {e}"
) # Use RuntimeError for unrecoverable errors during setup
mds_base_dir = os.path.expanduser("~/.mds")
if not os.path.exists(mds_base_dir):
os.makedirs(mds_base_dir)
log += f"Created directory {mds_base_dir}\n"
sanitized_slug = slug.replace("/", "__")
sanitized_branch = branch.replace("/", "__")
subdir_base = os.path.basename(subdir) if subdir else "root"
sanitized_subdir = subdir_base.replace("/", "__")
repo_name = os.path.join(
mds_base_dir, f"{sanitized_slug}__{sanitized_branch}__{sanitized_subdir}"
)
if os.path.exists(repo_name):
log += f"⚠ Skipping {repo_name}: directory already exists.\n"
print(log, file=sys.stderr) # Print log to stderr for info
return slug, branch, subdir, repo_name
if len(branch) == 40 and all(c in string.hexdigits for c in branch):
zip_url = f"https://github.com/{slug}/archive/{branch}.zip"
else:
zip_url = f"https://github.com/{slug}/archive/refs/heads/{branch}.zip"
log += f"🔍 Downloading ZIP from {zip_url}\n"
try:
with urllib.request.urlopen(zip_url) as response:
if response.status != 200:
raise RuntimeError(f"Failed to download ZIP: HTTP {response.status}")
data = response.read()
except Exception as e:
raise RuntimeError(f"❌ Failed to download ZIP: {e}")
log += f"📦 Extracting ZIP contents...\n"
try:
with zipfile.ZipFile(io.BytesIO(data)) as z:
names = z.namelist()
if not names:
raise RuntimeError("❌ ZIP file is empty.")
common_prefix = os.path.commonprefix(names)
if not common_prefix.endswith("/"):
separator_index = common_prefix.find("/")
if separator_index != -1:
common_prefix = common_prefix[: separator_index + 1]
else:
if names:
first_part = names[0].split("/")[0]
if all(
name.startswith(first_part + "/")
for name in names
if "/" in name
):
common_prefix = first_part + "/"
else: # Fallback if structure is unexpected
common_prefix = ""
else: # Should not happen if names is not empty check passed
common_prefix = ""
subdir_clean = subdir.strip("/")
if subdir_clean:
desired_prefix = f"{common_prefix}{subdir_clean}/"
else:
desired_prefix = (
common_prefix # Extract everything from the root dir inside zip
)
extracted = False
for member in names:
if member.startswith(desired_prefix):
rel_path = member[len(desired_prefix) :]
if (
not rel_path
): # Skip the directory entry itself if rel_path is empty
continue
target_path = os.path.join(repo_name, rel_path)
if member.endswith("/"):
os.makedirs(target_path, exist_ok=True)
else:
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "wb") as f:
f.write(z.read(member))
extracted = True
if (
not extracted and subdir_clean
): # Check extracted only if a specific subdir was requested
# Clean up potentially created (but empty) repo_name dir
if os.path.exists(repo_name):
try:
os.rmdir(repo_name) # Only removes if empty
except OSError:
pass
raise RuntimeError(
f"⚠ Directory not found in ZIP: '{subdir_clean}' under prefix '{common_prefix}'"
)
elif (
not extracted and not subdir_clean
): # Check if root extraction yielded nothing
raise RuntimeError(
"❌ Failed to extract any files from the repository root."
)
except zipfile.BadZipFile:
raise RuntimeError(
"❌ Failed to extract ZIP: File is corrupted or not a ZIP file."
)
except Exception as e:
raise RuntimeError(f"❌ Failed to extract ZIP: {e}")
log += f"✅ Successfully checked out to {repo_name}\n"
print(log, file=sys.stderr) # Print final log to stderr
if not os.path.exists(repo_name) or not os.listdir(repo_name):
raise RuntimeError(
f"❌ Failed to extract '{repo_name}'. Directory is missing or empty after extraction attempt."
)
return slug, branch, subdir, repo_name
def is_plain_text_file(filepath: str) -> bool:
try:
with open(filepath, "r", encoding="utf-8") as f:
f.read(1024) # Read up to 1KB
return True
except UnicodeDecodeError:
return False
except Exception:
return False
def search_docs_files(query: str, current_dir: Optional[str] = None) -> List[str]:
if current_dir is None:
current_dir = DOCS_DIR
if not os.path.isdir(current_dir):
if current_dir == DOCS_DIR:
print(
f"Warning: Base directory {DOCS_DIR} does not exist. No files to search.",
file=sys.stderr,
)
return []
return []
results: List[str] = []
try:
with os.scandir(current_dir) as entries:
for entry in entries:
full_path = os.path.join(current_dir, entry.name)
try:
if entry.is_dir(
follow_symlinks=False
): # Avoid infinite loops with symlinks
results.extend(search_docs_files(query, full_path))
elif entry.is_file(follow_symlinks=False):
if re.search(
re.escape(query), entry.name, re.IGNORECASE
) and is_plain_text_file(full_path):
results.append(full_path)
except OSError as e:
print(f"Skipping {full_path}: Cannot access ({e})", file=sys.stderr)
except OSError as e:
raise Exception(f"Failed to read directory {current_dir}: {e}")
return results
# --- Tool Definitions ---
def docs_search_tool(query: str, repo_dir: str = None) -> str:
try:
# Search within the specific repository directory if provided, otherwise fall back to DOCS_DIR
search_dir = repo_dir if repo_dir else DOCS_DIR
results = search_docs_files(query, search_dir)
return "\n".join(results) if results else "No matching files found."
except Exception as e:
return f"An error occurred: {e}"
def run_checkout(url: str) -> str:
try:
slug, branch, subdir, repo_path = checkout(url)
return f"Checkout successful: {slug} (Branch: {branch}, Subdir: {subdir if subdir else '/'}) -> {repo_path}"
except RuntimeError as e: # Catch RuntimeError raised by our checkout function
return str(e) # Return the error message from checkout
except Exception as e: # Catch any other unexpected errors
return f"An unexpected error occurred: {e}"
def read_file_tool(path: str) -> str:
try:
abs_path = os.path.abspath(path)
if not abs_path.startswith(os.path.abspath(DOCS_DIR)):
return f"Error: The specified path is outside the allowed directory ({DOCS_DIR})."
if not os.path.isfile(abs_path):
return "Error: The specified path is not a file."
with open(abs_path, "r", encoding="utf-8") as file:
content = file.read()
return content
except FileNotFoundError:
return "File not found."
except UnicodeDecodeError:
return (
"An error occurred while reading the file: The file is not UTF-8 encoded."
)
except Exception as e:
return f"An error occurred while reading the file: {e}"
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: python {sys.argv[0]} <GitHub Repository URL>")
print(
"Example: python unified_server.py https://github.com/owner/repo/tree/main/docs"
)
sys.exit(1)
url = sys.argv[1]
try:
# 1. Perform the initial checkout specified by the command line argument
print(f"--- Initial Checkout for {url} ---")
slug, branch, subdir, repo_path = checkout(url)
print(f"--- Checkout completed for {slug} to {repo_path} ---")
sanitized_slug = slug.replace("/", "_")
sanitized_branch = branch.replace("/", "_")
subdir_base = os.path.basename(subdir) if subdir else "root"
sanitized_subdir = subdir_base.replace("/", "_") if subdir else "" # Handle empty subdir
tool_name = f"mds_search_{sanitized_slug}"
tool_description = f"""
Search for files containing <query> in their filename within the specific repository's documents: {repo_path}
and return the list of their paths.
"""
# Create a closure that pre-fills the repo_dir argument
def specific_docs_search_tool(query: str) -> str:
return docs_search_tool(query, repo_path)
mcp = FastMCP("MDS(MPC Documents Server) Server")
mcp.add_tool(specific_docs_search_tool, tool_name, tool_description)
print(f"--- Added tool '{tool_name}' ---")
mcp.add_tool(
read_file_tool,
f"mds_read_{sanitized_slug}",
"Returns the contents of the file at the specified path.",
)
print("--- Added tool 'read_file' ---")
# 3. Start the main server
print("--- Starting MCP Server ---")
mcp.run()
except RuntimeError as e:
print(
f"\n❌ An error occurred during initialization or checkout: {e}",
file=sys.stderr,
)
sys.exit(1)
except Exception as e:
print(f"\n❌ An unexpected error occurred: {e}", file=sys.stderr)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment