Skip to content

Instantly share code, notes, and snippets.

@moosh3
Last active March 24, 2025 14:51
Show Gist options
  • Save moosh3/0e3ca453918b82761aed9fb0e0152c6d to your computer and use it in GitHub Desktop.
Save moosh3/0e3ca453918b82761aed9fb0e0152c6d to your computer and use it in GitHub Desktop.
mcp
# api/mcp/__init__.py
from fastapi import FastAPI
from api.mcp.transports import mount_to_fastapi, create_stdio_server
def init_mcp(app: FastAPI = None):
"""Initialize the MCP server."""
if app:
# If a FastAPI app is provided, mount the MCP endpoints
mount_to_fastapi(app)
return None
else:
# Otherwise, create a standalone stdio server
return create_stdio_server()
# api/mcp/auth.py
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from api import models, database
from api.dependencies import get_user_from_token
from datetime import datetime
def verify_mcp_token(token: str, db: Session = Depends(database.get_db)):
"""Verify an MCP token and return the associated user."""
# Query the MCPToken model
mcp_token = db.query(models.MCPToken).filter(
models.MCPToken.token == token,
models.MCPToken.is_revoked == False
).first()
if not mcp_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid MCP token"
)
# Check if token is expired
if mcp_token.expires_at and mcp_token.expires_at < datetime.now():
# Token is expired
mcp_token.is_revoked = True
db.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="MCP token expired"
)
# Update last used timestamp
mcp_token.last_used_at = datetime.now()
db.commit()
# Return the associated user
return mcp_token.user
# api/mcp/server.py
from mcp.server.fastmcp import FastMCP
from mcp.types import LogLevel
from sqlalchemy.orm import Session
from api import models, database
from api.dependencies import get_user_from_token
from typing import Dict, Any, List, Optional
import logging
logger = logging.getLogger(__name__)
class MCPAggregatorServer:
"""MCP Server for the MCP Aggregator application."""
def __init__(self, db: Session):
self.db = db
self.mcp = FastMCP("MCP-Aggregator")
self.setup_tools()
def setup_tools(self):
"""Register all tools from the database."""
# Get all available tools from the database
tools = self.db.query(models.Tool).all()
for tool in tools:
# Register each tool with the MCP server
self._register_tool(tool)
def _register_tool(self, tool: models.Tool):
"""Register a single tool with the MCP server."""
app = tool.app # Get the associated app
# Define the tool handler function
async def tool_handler(**kwargs):
"""Dynamic handler for the tool."""
try:
# Here you would implement the actual execution of the tool
# This could involve calling your existing tool execution logic
from api.routers.tools import execute_specific_tool
# Execute the tool using your existing logic
result = execute_specific_tool(tool.id, self.db, app.owner)
return result
except Exception as e:
logger.error(f"Error executing tool {tool.name}: {str(e)}")
# Return an error in the expected MCP format
return {
"isError": True,
"content": [{"type": "text", "text": f"Error: {str(e)}"}]
}
# Register the tool with MCP
self.mcp.register_tool(
name=f"{app.name}.{tool.name}",
description=tool.description or "",
parameters=tool.parameters, # Assuming this is in JSONSchema format
handler=tool_handler
)
def get_mcp_server(self):
"""Return the FastMCP server instance."""
return self.mcp
# api/mcp/tools.py
from mcp.types import Tool as MCPTool, ToolParameter
from api import models, schemas
from sqlalchemy.orm import Session
from typing import Dict, Any, List, Optional, Callable
import inspect
import json
import logging
logger = logging.getLogger(__name__)
def convert_tool_to_mcp_format(tool: models.Tool) -> MCPTool:
"""
Convert a database Tool model to MCP Tool format.
Args:
tool: The database Tool model to convert
Returns:
An MCP-compatible Tool definition
"""
# Extract parameters from the tool's parameters JSON
parameters = {}
if tool.parameters:
for param_name, param_schema in tool.parameters.items():
parameters[param_name] = ToolParameter(
type=param_schema.get("type", "string"),
description=param_schema.get("description", ""),
required=param_schema.get("required", False),
enum=param_schema.get("enum", None),
default=param_schema.get("default", None)
)
return MCPTool(
name=f"{tool.app.name}.{tool.name}",
description=tool.description or "",
parameters=parameters
)
async def execute_tool_with_mcp(
tool_id: int,
args: Dict[str, Any],
db: Session,
user: models.User
) -> Dict[str, Any]:
"""
Execute a tool using the existing execution logic but format result for MCP.
Args:
tool_id: The ID of the tool to execute
args: Arguments to pass to the tool
db: Database session
user: The user executing the tool
Returns:
MCP-formatted result
"""
try:
# Import the execution function from the existing tools router
from api.routers.tools import execute_specific_tool
# Execute the tool using existing logic
result = execute_specific_tool(tool_id, db, user)
# Format the result for MCP
return format_result_for_mcp(result)
except Exception as e:
logger.error(f"Error executing tool {tool_id}: {str(e)}")
return {
"isError": True,
"content": [{"type": "text", "text": f"Error: {str(e)}"}]
}
def format_result_for_mcp(result: Any) -> Dict[str, Any]:
"""
Format a tool execution result for MCP.
Args:
result: The result from tool execution
Returns:
MCP-formatted result
"""
# If result is already an error
if isinstance(result, dict) and result.get("error"):
return {
"isError": True,
"content": [{"type": "text", "text": result["error"]}]
}
# If result is a simple type (string, int, etc.)
if isinstance(result, (str, int, float, bool)):
return {
"isError": False,
"content": [{"type": "text", "text": str(result)}]
}
# If result is a list
if isinstance(result, list):
# Try to convert to a nicely formatted text
try:
formatted_text = json.dumps(result, indent=2)
return {
"isError": False,
"content": [{"type": "text", "text": formatted_text}]
}
except:
return {
"isError": False,
"content": [{"type": "text", "text": str(result)}]
}
# If result is a dict
if isinstance(result, dict):
# Check if it's already in MCP format
if "content" in result and ("isError" in result or "is_error" in result):
return result
# Try to convert to a nicely formatted text
try:
formatted_text = json.dumps(result, indent=2)
return {
"isError": False,
"content": [{"type": "text", "text": formatted_text}]
}
except:
return {
"isError": False,
"content": [{"type": "text", "text": str(result)}]
}
# Default case
return {
"isError": False,
"content": [{"type": "text", "text": str(result)}]
}
def get_all_available_tools(db: Session, user: models.User) -> List[MCPTool]:
"""
Get all tools available to a specific user in MCP format.
Args:
db: Database session
user: The user to get tools for
Returns:
List of MCP-compatible Tool definitions
"""
# Get all the apps owned by the user
apps = db.query(models.App).filter(models.App.owner_id == user.id).all()
# Get all tools for these apps
tools = []
for app in apps:
app_tools = db.query(models.Tool).filter(models.Tool.app_id == app.id).all()
tools.extend(app_tools)
# Convert to MCP format
return [convert_tool_to_mcp_format(tool) for tool in tools]
def register_dynamic_tool(
mcp_server: Any,
tool: models.Tool,
db: Session,
execute_function: Callable = execute_tool_with_mcp
):
"""
Dynamically register a tool with the MCP server.
Args:
mcp_server: The MCP server instance
tool: The tool to register
db: Database session
execute_function: The function to use for executing the tool
"""
# Create a dynamic handler function for this specific tool
async def tool_handler(**kwargs):
# Get user context (would need to be set elsewhere)
from fastapi import Request
request = kwargs.get("_request", None)
user = getattr(request, "user", None)
if not user:
return {
"isError": True,
"content": [{"type": "text", "text": "Authentication required"}]
}
# Execute the tool
return await execute_function(tool.id, kwargs, db, user)
# Register the tool with the MCP server
mcp_tool = convert_tool_to_mcp_format(tool)
mcp_server.register_tool(
name=mcp_tool.name,
description=mcp_tool.description,
parameters=mcp_tool.parameters,
handler=tool_handler
)
# api/mcp/transports.py
from mcp.server.transports import StdioTransport, HttpTransport, WebsocketTransport
from fastapi import FastAPI, Depends, HTTPException, WebSocket
from sqlalchemy.orm import Session
from api import database
from api.mcp.auth import verify_mcp_token
from api.mcp.server import MCPAggregatorServer
import asyncio
def create_stdio_server():
"""Create an MCP server with stdio transport for local usage."""
db = next(database.get_db())
server = MCPAggregatorServer(db).get_mcp_server()
# Use stdio transport
transport = StdioTransport()
server.serve(transport)
return server
def mount_to_fastapi(app: FastAPI):
"""Mount MCP transports to an existing FastAPI application."""
# HTTP transport endpoint
@app.post("/api/v1/mcp-transport/http")
async def mcp_http_endpoint(request: dict, db: Session = Depends(database.get_db)):
"""HTTP transport endpoint for MCP."""
# Get token from request header
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = verify_mcp_token(token, db)
# Create MCP server for this user
server = MCPAggregatorServer(db).get_mcp_server()
# Use HTTP transport
transport = HttpTransport()
return await server.handle_request(request, transport)
# WebSocket transport endpoint
@app.websocket("/api/v1/mcp-transport/ws")
async def mcp_websocket_endpoint(websocket: WebSocket, db: Session = Depends(database.get_db)):
"""WebSocket transport for MCP."""
await websocket.accept()
# Get token from query parameter or initial message
token = websocket.query_params.get("token", "")
try:
user = verify_mcp_token(token, db)
# Create MCP server for this user
server = MCPAggregatorServer(db).get_mcp_server()
# Use WebSocket transport
transport = WebsocketTransport(websocket)
await server.serve(transport)
except HTTPException:
await websocket.close(code=1008) # Policy violation
except Exception as e:
await websocket.close(code=1011) # Internal error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment