Created
June 13, 2025 03:53
-
-
Save spdin/95008fd5ab368a36e0118a604723bdee to your computer and use it in GitHub Desktop.
MCP Remote Server Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import logging | |
from typing import Any, Dict, Optional | |
import modal | |
from fastapi import FastAPI, Request | |
from fastapi.responses import JSONResponse, StreamingResponse, Response | |
from pydantic import BaseModel | |
# Modal app configuration | |
app = modal.App("mcp-hello-world") | |
# Define the image with required dependencies | |
image = modal.Image.debian_slim().pip_install([ | |
"pydantic>=2.0.0", | |
"fastapi>=0.104.0" | |
]) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# MCP Protocol Models | |
class MCPRequest(BaseModel): | |
jsonrpc: str = "2.0" | |
id: Optional[int | str] = None | |
method: str | |
params: Optional[Dict[str, Any]] = None | |
class MCPResponse(BaseModel): | |
jsonrpc: str = "2.0" | |
id: Optional[int | str] = None | |
result: Optional[Dict[str, Any]] = None | |
error: Optional[Dict[str, Any]] = None | |
class MCPTool(BaseModel): | |
name: str | |
description: str | |
inputSchema: Dict[str, Any] | |
class AdditionParams(BaseModel): | |
a: int | |
b: int | |
class MultiplicationParams(BaseModel): | |
a: int | |
b: int | |
class MCPServer: | |
def __init__(self): | |
self.tools = [ | |
MCPTool( | |
name="add", | |
description="Add two integers together and return the result.", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"a": { | |
"type": "integer", | |
"description": "The first integer to add" | |
}, | |
"b": { | |
"type": "integer", | |
"description": "The second integer to add" | |
} | |
}, | |
"required": ["a", "b"] | |
} | |
), | |
MCPTool( | |
name="multiply", | |
description="Multiply two integers together and return the result.", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"a": { | |
"type": "integer", | |
"description": "The first integer to multiply" | |
}, | |
"b": { | |
"type": "integer", | |
"description": "The second integer to multiply" | |
} | |
}, | |
"required": ["a", "b"] | |
} | |
) | |
] | |
async def perform_addition(self, params: AdditionParams) -> Dict[str, Any]: | |
"""Perform addition of two integers""" | |
print("Performing addition:") | |
print(f"a = {params.a}, b = {params.b}") | |
print("--------------------------------") | |
result = params.a + params.b | |
return { | |
"operation": "addition", | |
"operands": {"a": params.a, "b": params.b}, | |
"result": result, | |
"message": f"{params.a} + {params.b} = {result}" | |
} | |
async def perform_multiplication(self, params: MultiplicationParams) -> Dict[str, Any]: | |
"""Perform multiplication of two integers""" | |
print("Performing multiplication:") | |
print(f"a = {params.a}, b = {params.b}") | |
print("--------------------------------") | |
result = params.a * params.b | |
return { | |
"operation": "multiplication", | |
"operands": {"a": params.a, "b": params.b}, | |
"result": result, | |
"message": f"{params.a} * {params.b} = {result}" | |
} | |
async def handle_request(self, request: MCPRequest) -> MCPResponse: | |
"""Handle MCP requests""" | |
try: | |
if request.method == "initialize": | |
# Handle initialization request | |
return MCPResponse( | |
id=request.id, | |
result={ | |
"protocolVersion": "2025-03-26", | |
"capabilities": { | |
"tools": {} | |
}, | |
"serverInfo": { | |
"name": "mcp-hello-world", | |
"version": "1.0.0" | |
} | |
} | |
) | |
elif request.method == "notifications/initialized": | |
return MCPResponse(id=request.id) | |
elif request.method == "tools/list": | |
return MCPResponse( | |
id=request.id, | |
result={ | |
"tools": [tool.model_dump() for tool in self.tools] | |
} | |
) | |
elif request.method == "tools/call": | |
if not request.params: | |
raise ValueError("Missing parameters") | |
tool_name = request.params.get("name") | |
arguments = request.params.get("arguments", {}) | |
if tool_name == "add": | |
params = AdditionParams(**arguments) | |
result = await self.perform_addition(params) | |
return MCPResponse( | |
id=request.id, | |
result={ | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"{params.a} + {params.b} = {result['result']}" | |
} | |
] | |
} | |
) | |
elif tool_name == "multiply": | |
params = MultiplicationParams(**arguments) | |
result = await self.perform_multiplication(params) | |
return MCPResponse( | |
id=request.id, | |
result={ | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"{params.a} * {params.b} = {result['result']}" | |
} | |
] | |
} | |
) | |
else: | |
raise ValueError(f"Unknown tool: {tool_name}") | |
else: | |
raise ValueError(f"Unknown method: {request.method}") | |
except Exception as e: | |
logger.error(f"Error handling request: {e}") | |
return MCPResponse( | |
id=request.id, | |
error={ | |
"code": -32000, | |
"message": str(e) | |
} | |
) | |
# Modal ASGI app following the pattern from modal_update.py | |
@app.function( | |
image=image, | |
min_containers=1, | |
timeout=300, | |
memory=512 | |
) | |
@modal.asgi_app() | |
def fastapi_app(): | |
"""FastAPI application for MCP server""" | |
fastapi_app = FastAPI(title="MCP Hello World", version="1.0.0") | |
@fastapi_app.api_route("/mcp", methods=["GET", "POST"]) | |
async def mcp_endpoint(request: Request): | |
"""MCP protocol endpoint supporting both GET and POST""" | |
server = MCPServer() | |
if request.method == "GET": | |
# Handle SSE connection for GET requests | |
async def event_stream(): | |
# Send initial message | |
initialized_msg = { | |
"jsonrpc": "2.0", | |
"method": "initialized" | |
} | |
yield f"data: {json.dumps(initialized_msg)}\n\n" | |
# Periodic ping messages | |
ping_msg = { | |
"jsonrpc": "2.0", | |
"method": "ping" | |
} | |
while True: | |
await asyncio.sleep(30) | |
yield f"data: {json.dumps(ping_msg)}\n\n" | |
return StreamingResponse( | |
event_stream(), | |
media_type="text/event-stream", | |
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} | |
) | |
elif request.method == "POST": | |
# Handle JSON-RPC requests for POST | |
request_data = await request.json() | |
print(request_data) | |
try: | |
mcp_request = MCPRequest(**request_data) | |
response = await server.handle_request(mcp_request) | |
response_data = response.model_dump(exclude_none=True) | |
if "result" not in response_data and "error" not in response_data: | |
# This is a notification that doesn't need a response. | |
return Response(status_code=204) | |
return JSONResponse(content=response_data) | |
except Exception as e: | |
logger.error(f"Error processing request: {e}") | |
return JSONResponse( | |
content={ | |
"jsonrpc": "2.0", | |
"id": request_data.get("id") if 'request_data' in locals() else None, | |
"error": {"code": -32700, "message": f"Parse error: {str(e)}"} | |
}, | |
status_code=500 | |
) | |
@fastapi_app.get("/health") | |
async def health_check(): | |
"""Health check endpoint""" | |
return JSONResponse(content={"status": "healthy", "service": "mcp-hello-world"}) | |
return fastapi_app |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import logging | |
from typing import Any, Dict, Optional | |
from urllib.parse import urlparse | |
import aiohttp | |
import modal | |
from fastapi import FastAPI, Request | |
from fastapi.responses import JSONResponse, StreamingResponse, Response | |
from pydantic import BaseModel | |
# Modal app configuration | |
app = modal.App("mcp-url-fetcher") | |
# Define the image with required dependencies | |
image = modal.Image.debian_slim().pip_install([ | |
"aiohttp>=3.8.0", | |
"pydantic>=2.0.0", | |
"beautifulsoup4>=4.12.0", | |
"lxml>=4.9.0", | |
"fastapi>=0.104.0", | |
"markdownify>=0.11.6" | |
]) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# MCP Protocol Models | |
class MCPRequest(BaseModel): | |
jsonrpc: str = "2.0" | |
id: Optional[int | str] = None | |
method: str | |
params: Optional[Dict[str, Any]] = None | |
class MCPResponse(BaseModel): | |
jsonrpc: str = "2.0" | |
id: Optional[int | str] = None | |
result: Optional[Dict[str, Any]] = None | |
error: Optional[Dict[str, Any]] = None | |
class MCPTool(BaseModel): | |
name: str | |
description: str | |
inputSchema: Dict[str, Any] | |
class URLFetchParams(BaseModel): | |
url: str | |
class MCPServer: | |
def __init__(self): | |
self.tools = [ | |
MCPTool( | |
name="fetch_url", | |
description="Fetch content from a given URL. Supports HTML, JSON, and text content.", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"url": { | |
"type": "string", | |
"description": "The URL to fetch content from" | |
} | |
}, | |
"required": ["url"] | |
} | |
) | |
] | |
def validate_url(self, url: str) -> bool: | |
"""Validate URL format and safety""" | |
try: | |
parsed = urlparse(url) | |
if not parsed.scheme or not parsed.netloc: | |
return False | |
# Only allow HTTP/HTTPS | |
if parsed.scheme not in ['http', 'https']: | |
return False | |
# Block localhost and private IPs for security | |
if parsed.netloc.lower() in ['localhost', '127.0.0.1', '0.0.0.0']: | |
return False | |
return True | |
except Exception: | |
return False | |
def html_to_markdown(self, html_content: str, **options) -> str: | |
from bs4 import BeautifulSoup | |
from markdownify import markdownify as md | |
# Use BeautifulSoup to isolate the main content of the page | |
soup = BeautifulSoup(html_content, 'lxml') | |
main_content = soup.find('main') or soup.find('article') or soup.body | |
if main_content: | |
# Convert only the main content to Markdown | |
return md(str(main_content), **options).strip() | |
else: | |
# Fallback for documents without a clear main tag | |
return md(html_content, **options).strip() | |
async def fetch_url_content(self, params: URLFetchParams) -> str: | |
"""Fetch content from URL with simplified parameters""" | |
if not self.validate_url(params.url): | |
raise ValueError(f"Invalid or unsafe URL: {params.url}") | |
# Use default values for all optional parameters | |
headers = {'User-Agent': 'MCP-URL-Fetcher/1.0'} | |
timeout = aiohttp.ClientTimeout(total=30) | |
max_size = 10 * 1024 * 1024 # 10MB limit | |
try: | |
async with aiohttp.ClientSession(timeout=timeout) as session: | |
async with session.get(params.url, headers=headers) as response: | |
# Check content length | |
content_length = response.headers.get('content-length') | |
if content_length and int(content_length) > max_size: | |
raise ValueError(f"Content too large: {content_length} bytes") | |
# Read content with size limit | |
content = b'' | |
async for chunk in response.content.iter_chunked(8192): | |
content += chunk | |
if len(content) > max_size: | |
raise ValueError(f"Content too large: > {max_size} bytes") | |
data = content.decode('utf-8', errors='replace') | |
data = self.html_to_markdown(data) | |
return data | |
except aiohttp.ClientError as e: | |
raise ValueError(f"Request failed: {str(e)}") | |
except asyncio.TimeoutError: | |
raise ValueError("Request timed out after 30 seconds") | |
async def handle_request(self, request: MCPRequest) -> MCPResponse: | |
"""Handle MCP requests""" | |
try: | |
if request.method == "initialize": | |
# Handle initialization request | |
return MCPResponse( | |
id=request.id, | |
result={ | |
"protocolVersion": "2025-03-26", | |
"capabilities": { | |
"tools": {} | |
}, | |
"serverInfo": { | |
"name": "mcp-url-fetcher-simple", | |
"version": "1.0.0" | |
} | |
} | |
) | |
elif request.method == "notifications/initialized": | |
return MCPResponse(id=request.id) | |
elif request.method == "tools/list": | |
return MCPResponse( | |
id=request.id, | |
result={ | |
"tools": [tool.model_dump() for tool in self.tools] | |
} | |
) | |
elif request.method == "tools/call": | |
if not request.params: | |
raise ValueError("Missing parameters") | |
tool_name = request.params.get("name") | |
arguments = request.params.get("arguments", {}) | |
if tool_name == "fetch_url": | |
params = URLFetchParams(**arguments) | |
result = await self.fetch_url_content(params) | |
return MCPResponse( | |
id=request.id, | |
result={ | |
"content": [ | |
{ | |
"type": "text", | |
"text": result | |
} | |
] | |
} | |
) | |
else: | |
raise ValueError(f"Unknown tool: {tool_name}") | |
else: | |
raise ValueError(f"Unknown method: {request.method}") | |
except Exception as e: | |
logger.error(f"Error handling request: {e}") | |
return MCPResponse( | |
id=request.id, | |
error={ | |
"code": -32000, | |
"message": str(e) | |
} | |
) | |
# Modal ASGI app following the pattern from modal_update.py | |
@app.function( | |
image=image, | |
min_containers=1, | |
timeout=300, | |
memory=512 | |
) | |
@modal.asgi_app() | |
def fastapi_app(): | |
"""FastAPI application for MCP server""" | |
fastapi_app = FastAPI(title="MCP URL Fetcher Simple", version="1.0.0") | |
@fastapi_app.api_route("/mcp", methods=["GET", "POST"]) | |
async def mcp_endpoint(request: Request): | |
"""MCP protocol endpoint supporting both GET and POST""" | |
server = MCPServer() | |
if request.method == "GET": | |
# Handle SSE connection for GET requests | |
async def event_stream(): | |
# Send initial message | |
initialized_msg = { | |
"jsonrpc": "2.0", | |
"method": "initialized" | |
} | |
yield f"data: {json.dumps(initialized_msg)}\n\n" | |
# Periodic ping messages | |
ping_msg = { | |
"jsonrpc": "2.0", | |
"method": "ping" | |
} | |
while True: | |
await asyncio.sleep(30) | |
yield f"data: {json.dumps(ping_msg)}\n\n" | |
return StreamingResponse( | |
event_stream(), | |
media_type="text/event-stream", | |
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} | |
) | |
elif request.method == "POST": | |
# Handle JSON-RPC requests for POST | |
request_data = await request.json() | |
print(request_data) | |
try: | |
mcp_request = MCPRequest(**request_data) | |
response = await server.handle_request(mcp_request) | |
response_data = response.model_dump(exclude_none=True) | |
if "result" not in response_data and "error" not in response_data: | |
# This is a notification that doesn't need a response. | |
return Response(status_code=204) | |
return JSONResponse(content=response_data) | |
except Exception as e: | |
logger.error(f"Error processing request: {e}") | |
return JSONResponse( | |
content={ | |
"jsonrpc": "2.0", | |
"id": request_data.get("id") if 'request_data' in locals() else None, | |
"error": {"code": -32700, "message": f"Parse error: {str(e)}"} | |
}, | |
status_code=500 | |
) | |
@fastapi_app.get("/health") | |
async def health_check(): | |
"""Health check endpoint""" | |
return JSONResponse(content={"status": "healthy", "service": "mcp-url-fetcher-simple"}) | |
return fastapi_app |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment