Skip to content

Instantly share code, notes, and snippets.

@spdin
Created June 13, 2025 03:53
Show Gist options
  • Save spdin/95008fd5ab368a36e0118a604723bdee to your computer and use it in GitHub Desktop.
Save spdin/95008fd5ab368a36e0118a604723bdee to your computer and use it in GitHub Desktop.
MCP Remote Server Example
import asyncio
import json
import logging
from typing import Any, Dict, Optional
import modal
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse, Response
from pydantic import BaseModel
# Modal app configuration
app = modal.App("mcp-hello-world")
# Define the image with required dependencies
image = modal.Image.debian_slim().pip_install([
"pydantic>=2.0.0",
"fastapi>=0.104.0"
])
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# MCP Protocol Models
class MCPRequest(BaseModel):
jsonrpc: str = "2.0"
id: Optional[int | str] = None
method: str
params: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
jsonrpc: str = "2.0"
id: Optional[int | str] = None
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
class MCPTool(BaseModel):
name: str
description: str
inputSchema: Dict[str, Any]
class AdditionParams(BaseModel):
a: int
b: int
class MultiplicationParams(BaseModel):
a: int
b: int
class MCPServer:
def __init__(self):
self.tools = [
MCPTool(
name="add",
description="Add two integers together and return the result.",
inputSchema={
"type": "object",
"properties": {
"a": {
"type": "integer",
"description": "The first integer to add"
},
"b": {
"type": "integer",
"description": "The second integer to add"
}
},
"required": ["a", "b"]
}
),
MCPTool(
name="multiply",
description="Multiply two integers together and return the result.",
inputSchema={
"type": "object",
"properties": {
"a": {
"type": "integer",
"description": "The first integer to multiply"
},
"b": {
"type": "integer",
"description": "The second integer to multiply"
}
},
"required": ["a", "b"]
}
)
]
async def perform_addition(self, params: AdditionParams) -> Dict[str, Any]:
"""Perform addition of two integers"""
print("Performing addition:")
print(f"a = {params.a}, b = {params.b}")
print("--------------------------------")
result = params.a + params.b
return {
"operation": "addition",
"operands": {"a": params.a, "b": params.b},
"result": result,
"message": f"{params.a} + {params.b} = {result}"
}
async def perform_multiplication(self, params: MultiplicationParams) -> Dict[str, Any]:
"""Perform multiplication of two integers"""
print("Performing multiplication:")
print(f"a = {params.a}, b = {params.b}")
print("--------------------------------")
result = params.a * params.b
return {
"operation": "multiplication",
"operands": {"a": params.a, "b": params.b},
"result": result,
"message": f"{params.a} * {params.b} = {result}"
}
async def handle_request(self, request: MCPRequest) -> MCPResponse:
"""Handle MCP requests"""
try:
if request.method == "initialize":
# Handle initialization request
return MCPResponse(
id=request.id,
result={
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "mcp-hello-world",
"version": "1.0.0"
}
}
)
elif request.method == "notifications/initialized":
return MCPResponse(id=request.id)
elif request.method == "tools/list":
return MCPResponse(
id=request.id,
result={
"tools": [tool.model_dump() for tool in self.tools]
}
)
elif request.method == "tools/call":
if not request.params:
raise ValueError("Missing parameters")
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
if tool_name == "add":
params = AdditionParams(**arguments)
result = await self.perform_addition(params)
return MCPResponse(
id=request.id,
result={
"content": [
{
"type": "text",
"text": f"{params.a} + {params.b} = {result['result']}"
}
]
}
)
elif tool_name == "multiply":
params = MultiplicationParams(**arguments)
result = await self.perform_multiplication(params)
return MCPResponse(
id=request.id,
result={
"content": [
{
"type": "text",
"text": f"{params.a} * {params.b} = {result['result']}"
}
]
}
)
else:
raise ValueError(f"Unknown tool: {tool_name}")
else:
raise ValueError(f"Unknown method: {request.method}")
except Exception as e:
logger.error(f"Error handling request: {e}")
return MCPResponse(
id=request.id,
error={
"code": -32000,
"message": str(e)
}
)
# Modal ASGI app following the pattern from modal_update.py
@app.function(
image=image,
min_containers=1,
timeout=300,
memory=512
)
@modal.asgi_app()
def fastapi_app():
"""FastAPI application for MCP server"""
fastapi_app = FastAPI(title="MCP Hello World", version="1.0.0")
@fastapi_app.api_route("/mcp", methods=["GET", "POST"])
async def mcp_endpoint(request: Request):
"""MCP protocol endpoint supporting both GET and POST"""
server = MCPServer()
if request.method == "GET":
# Handle SSE connection for GET requests
async def event_stream():
# Send initial message
initialized_msg = {
"jsonrpc": "2.0",
"method": "initialized"
}
yield f"data: {json.dumps(initialized_msg)}\n\n"
# Periodic ping messages
ping_msg = {
"jsonrpc": "2.0",
"method": "ping"
}
while True:
await asyncio.sleep(30)
yield f"data: {json.dumps(ping_msg)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
elif request.method == "POST":
# Handle JSON-RPC requests for POST
request_data = await request.json()
print(request_data)
try:
mcp_request = MCPRequest(**request_data)
response = await server.handle_request(mcp_request)
response_data = response.model_dump(exclude_none=True)
if "result" not in response_data and "error" not in response_data:
# This is a notification that doesn't need a response.
return Response(status_code=204)
return JSONResponse(content=response_data)
except Exception as e:
logger.error(f"Error processing request: {e}")
return JSONResponse(
content={
"jsonrpc": "2.0",
"id": request_data.get("id") if 'request_data' in locals() else None,
"error": {"code": -32700, "message": f"Parse error: {str(e)}"}
},
status_code=500
)
@fastapi_app.get("/health")
async def health_check():
"""Health check endpoint"""
return JSONResponse(content={"status": "healthy", "service": "mcp-hello-world"})
return fastapi_app
import asyncio
import json
import logging
from typing import Any, Dict, Optional
from urllib.parse import urlparse
import aiohttp
import modal
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse, Response
from pydantic import BaseModel
# Modal app configuration
app = modal.App("mcp-url-fetcher")
# Define the image with required dependencies
image = modal.Image.debian_slim().pip_install([
"aiohttp>=3.8.0",
"pydantic>=2.0.0",
"beautifulsoup4>=4.12.0",
"lxml>=4.9.0",
"fastapi>=0.104.0",
"markdownify>=0.11.6"
])
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# MCP Protocol Models
class MCPRequest(BaseModel):
jsonrpc: str = "2.0"
id: Optional[int | str] = None
method: str
params: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
jsonrpc: str = "2.0"
id: Optional[int | str] = None
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
class MCPTool(BaseModel):
name: str
description: str
inputSchema: Dict[str, Any]
class URLFetchParams(BaseModel):
url: str
class MCPServer:
def __init__(self):
self.tools = [
MCPTool(
name="fetch_url",
description="Fetch content from a given URL. Supports HTML, JSON, and text content.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch content from"
}
},
"required": ["url"]
}
)
]
def validate_url(self, url: str) -> bool:
"""Validate URL format and safety"""
try:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return False
# Only allow HTTP/HTTPS
if parsed.scheme not in ['http', 'https']:
return False
# Block localhost and private IPs for security
if parsed.netloc.lower() in ['localhost', '127.0.0.1', '0.0.0.0']:
return False
return True
except Exception:
return False
def html_to_markdown(self, html_content: str, **options) -> str:
from bs4 import BeautifulSoup
from markdownify import markdownify as md
# Use BeautifulSoup to isolate the main content of the page
soup = BeautifulSoup(html_content, 'lxml')
main_content = soup.find('main') or soup.find('article') or soup.body
if main_content:
# Convert only the main content to Markdown
return md(str(main_content), **options).strip()
else:
# Fallback for documents without a clear main tag
return md(html_content, **options).strip()
async def fetch_url_content(self, params: URLFetchParams) -> str:
"""Fetch content from URL with simplified parameters"""
if not self.validate_url(params.url):
raise ValueError(f"Invalid or unsafe URL: {params.url}")
# Use default values for all optional parameters
headers = {'User-Agent': 'MCP-URL-Fetcher/1.0'}
timeout = aiohttp.ClientTimeout(total=30)
max_size = 10 * 1024 * 1024 # 10MB limit
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(params.url, headers=headers) as response:
# Check content length
content_length = response.headers.get('content-length')
if content_length and int(content_length) > max_size:
raise ValueError(f"Content too large: {content_length} bytes")
# Read content with size limit
content = b''
async for chunk in response.content.iter_chunked(8192):
content += chunk
if len(content) > max_size:
raise ValueError(f"Content too large: > {max_size} bytes")
data = content.decode('utf-8', errors='replace')
data = self.html_to_markdown(data)
return data
except aiohttp.ClientError as e:
raise ValueError(f"Request failed: {str(e)}")
except asyncio.TimeoutError:
raise ValueError("Request timed out after 30 seconds")
async def handle_request(self, request: MCPRequest) -> MCPResponse:
"""Handle MCP requests"""
try:
if request.method == "initialize":
# Handle initialization request
return MCPResponse(
id=request.id,
result={
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "mcp-url-fetcher-simple",
"version": "1.0.0"
}
}
)
elif request.method == "notifications/initialized":
return MCPResponse(id=request.id)
elif request.method == "tools/list":
return MCPResponse(
id=request.id,
result={
"tools": [tool.model_dump() for tool in self.tools]
}
)
elif request.method == "tools/call":
if not request.params:
raise ValueError("Missing parameters")
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
if tool_name == "fetch_url":
params = URLFetchParams(**arguments)
result = await self.fetch_url_content(params)
return MCPResponse(
id=request.id,
result={
"content": [
{
"type": "text",
"text": result
}
]
}
)
else:
raise ValueError(f"Unknown tool: {tool_name}")
else:
raise ValueError(f"Unknown method: {request.method}")
except Exception as e:
logger.error(f"Error handling request: {e}")
return MCPResponse(
id=request.id,
error={
"code": -32000,
"message": str(e)
}
)
# Modal ASGI app following the pattern from modal_update.py
@app.function(
image=image,
min_containers=1,
timeout=300,
memory=512
)
@modal.asgi_app()
def fastapi_app():
"""FastAPI application for MCP server"""
fastapi_app = FastAPI(title="MCP URL Fetcher Simple", version="1.0.0")
@fastapi_app.api_route("/mcp", methods=["GET", "POST"])
async def mcp_endpoint(request: Request):
"""MCP protocol endpoint supporting both GET and POST"""
server = MCPServer()
if request.method == "GET":
# Handle SSE connection for GET requests
async def event_stream():
# Send initial message
initialized_msg = {
"jsonrpc": "2.0",
"method": "initialized"
}
yield f"data: {json.dumps(initialized_msg)}\n\n"
# Periodic ping messages
ping_msg = {
"jsonrpc": "2.0",
"method": "ping"
}
while True:
await asyncio.sleep(30)
yield f"data: {json.dumps(ping_msg)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
elif request.method == "POST":
# Handle JSON-RPC requests for POST
request_data = await request.json()
print(request_data)
try:
mcp_request = MCPRequest(**request_data)
response = await server.handle_request(mcp_request)
response_data = response.model_dump(exclude_none=True)
if "result" not in response_data and "error" not in response_data:
# This is a notification that doesn't need a response.
return Response(status_code=204)
return JSONResponse(content=response_data)
except Exception as e:
logger.error(f"Error processing request: {e}")
return JSONResponse(
content={
"jsonrpc": "2.0",
"id": request_data.get("id") if 'request_data' in locals() else None,
"error": {"code": -32700, "message": f"Parse error: {str(e)}"}
},
status_code=500
)
@fastapi_app.get("/health")
async def health_check():
"""Health check endpoint"""
return JSONResponse(content={"status": "healthy", "service": "mcp-url-fetcher-simple"})
return fastapi_app
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment