Last active
March 24, 2025 14:51
-
-
Save moosh3/0e3ca453918b82761aed9fb0e0152c6d to your computer and use it in GitHub Desktop.
mcp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# api/mcp/__init__.py | |
from fastapi import FastAPI | |
from api.mcp.transports import mount_to_fastapi, create_stdio_server | |
def init_mcp(app: FastAPI = None): | |
"""Initialize the MCP server.""" | |
if app: | |
# If a FastAPI app is provided, mount the MCP endpoints | |
mount_to_fastapi(app) | |
return None | |
else: | |
# Otherwise, create a standalone stdio server | |
return create_stdio_server() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# api/mcp/auth.py | |
from fastapi import Depends, HTTPException, status | |
from sqlalchemy.orm import Session | |
from api import models, database | |
from api.dependencies import get_user_from_token | |
from datetime import datetime | |
def verify_mcp_token(token: str, db: Session = Depends(database.get_db)): | |
"""Verify an MCP token and return the associated user.""" | |
# Query the MCPToken model | |
mcp_token = db.query(models.MCPToken).filter( | |
models.MCPToken.token == token, | |
models.MCPToken.is_revoked == False | |
).first() | |
if not mcp_token: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid MCP token" | |
) | |
# Check if token is expired | |
if mcp_token.expires_at and mcp_token.expires_at < datetime.now(): | |
# Token is expired | |
mcp_token.is_revoked = True | |
db.commit() | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="MCP token expired" | |
) | |
# Update last used timestamp | |
mcp_token.last_used_at = datetime.now() | |
db.commit() | |
# Return the associated user | |
return mcp_token.user |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# api/mcp/server.py | |
from mcp.server.fastmcp import FastMCP | |
from mcp.types import LogLevel | |
from sqlalchemy.orm import Session | |
from api import models, database | |
from api.dependencies import get_user_from_token | |
from typing import Dict, Any, List, Optional | |
import logging | |
logger = logging.getLogger(__name__) | |
class MCPAggregatorServer: | |
"""MCP Server for the MCP Aggregator application.""" | |
def __init__(self, db: Session): | |
self.db = db | |
self.mcp = FastMCP("MCP-Aggregator") | |
self.setup_tools() | |
def setup_tools(self): | |
"""Register all tools from the database.""" | |
# Get all available tools from the database | |
tools = self.db.query(models.Tool).all() | |
for tool in tools: | |
# Register each tool with the MCP server | |
self._register_tool(tool) | |
def _register_tool(self, tool: models.Tool): | |
"""Register a single tool with the MCP server.""" | |
app = tool.app # Get the associated app | |
# Define the tool handler function | |
async def tool_handler(**kwargs): | |
"""Dynamic handler for the tool.""" | |
try: | |
# Here you would implement the actual execution of the tool | |
# This could involve calling your existing tool execution logic | |
from api.routers.tools import execute_specific_tool | |
# Execute the tool using your existing logic | |
result = execute_specific_tool(tool.id, self.db, app.owner) | |
return result | |
except Exception as e: | |
logger.error(f"Error executing tool {tool.name}: {str(e)}") | |
# Return an error in the expected MCP format | |
return { | |
"isError": True, | |
"content": [{"type": "text", "text": f"Error: {str(e)}"}] | |
} | |
# Register the tool with MCP | |
self.mcp.register_tool( | |
name=f"{app.name}.{tool.name}", | |
description=tool.description or "", | |
parameters=tool.parameters, # Assuming this is in JSONSchema format | |
handler=tool_handler | |
) | |
def get_mcp_server(self): | |
"""Return the FastMCP server instance.""" | |
return self.mcp |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# api/mcp/tools.py | |
from mcp.types import Tool as MCPTool, ToolParameter | |
from api import models, schemas | |
from sqlalchemy.orm import Session | |
from typing import Dict, Any, List, Optional, Callable | |
import inspect | |
import json | |
import logging | |
logger = logging.getLogger(__name__) | |
def convert_tool_to_mcp_format(tool: models.Tool) -> MCPTool: | |
""" | |
Convert a database Tool model to MCP Tool format. | |
Args: | |
tool: The database Tool model to convert | |
Returns: | |
An MCP-compatible Tool definition | |
""" | |
# Extract parameters from the tool's parameters JSON | |
parameters = {} | |
if tool.parameters: | |
for param_name, param_schema in tool.parameters.items(): | |
parameters[param_name] = ToolParameter( | |
type=param_schema.get("type", "string"), | |
description=param_schema.get("description", ""), | |
required=param_schema.get("required", False), | |
enum=param_schema.get("enum", None), | |
default=param_schema.get("default", None) | |
) | |
return MCPTool( | |
name=f"{tool.app.name}.{tool.name}", | |
description=tool.description or "", | |
parameters=parameters | |
) | |
async def execute_tool_with_mcp( | |
tool_id: int, | |
args: Dict[str, Any], | |
db: Session, | |
user: models.User | |
) -> Dict[str, Any]: | |
""" | |
Execute a tool using the existing execution logic but format result for MCP. | |
Args: | |
tool_id: The ID of the tool to execute | |
args: Arguments to pass to the tool | |
db: Database session | |
user: The user executing the tool | |
Returns: | |
MCP-formatted result | |
""" | |
try: | |
# Import the execution function from the existing tools router | |
from api.routers.tools import execute_specific_tool | |
# Execute the tool using existing logic | |
result = execute_specific_tool(tool_id, db, user) | |
# Format the result for MCP | |
return format_result_for_mcp(result) | |
except Exception as e: | |
logger.error(f"Error executing tool {tool_id}: {str(e)}") | |
return { | |
"isError": True, | |
"content": [{"type": "text", "text": f"Error: {str(e)}"}] | |
} | |
def format_result_for_mcp(result: Any) -> Dict[str, Any]: | |
""" | |
Format a tool execution result for MCP. | |
Args: | |
result: The result from tool execution | |
Returns: | |
MCP-formatted result | |
""" | |
# If result is already an error | |
if isinstance(result, dict) and result.get("error"): | |
return { | |
"isError": True, | |
"content": [{"type": "text", "text": result["error"]}] | |
} | |
# If result is a simple type (string, int, etc.) | |
if isinstance(result, (str, int, float, bool)): | |
return { | |
"isError": False, | |
"content": [{"type": "text", "text": str(result)}] | |
} | |
# If result is a list | |
if isinstance(result, list): | |
# Try to convert to a nicely formatted text | |
try: | |
formatted_text = json.dumps(result, indent=2) | |
return { | |
"isError": False, | |
"content": [{"type": "text", "text": formatted_text}] | |
} | |
except: | |
return { | |
"isError": False, | |
"content": [{"type": "text", "text": str(result)}] | |
} | |
# If result is a dict | |
if isinstance(result, dict): | |
# Check if it's already in MCP format | |
if "content" in result and ("isError" in result or "is_error" in result): | |
return result | |
# Try to convert to a nicely formatted text | |
try: | |
formatted_text = json.dumps(result, indent=2) | |
return { | |
"isError": False, | |
"content": [{"type": "text", "text": formatted_text}] | |
} | |
except: | |
return { | |
"isError": False, | |
"content": [{"type": "text", "text": str(result)}] | |
} | |
# Default case | |
return { | |
"isError": False, | |
"content": [{"type": "text", "text": str(result)}] | |
} | |
def get_all_available_tools(db: Session, user: models.User) -> List[MCPTool]: | |
""" | |
Get all tools available to a specific user in MCP format. | |
Args: | |
db: Database session | |
user: The user to get tools for | |
Returns: | |
List of MCP-compatible Tool definitions | |
""" | |
# Get all the apps owned by the user | |
apps = db.query(models.App).filter(models.App.owner_id == user.id).all() | |
# Get all tools for these apps | |
tools = [] | |
for app in apps: | |
app_tools = db.query(models.Tool).filter(models.Tool.app_id == app.id).all() | |
tools.extend(app_tools) | |
# Convert to MCP format | |
return [convert_tool_to_mcp_format(tool) for tool in tools] | |
def register_dynamic_tool( | |
mcp_server: Any, | |
tool: models.Tool, | |
db: Session, | |
execute_function: Callable = execute_tool_with_mcp | |
): | |
""" | |
Dynamically register a tool with the MCP server. | |
Args: | |
mcp_server: The MCP server instance | |
tool: The tool to register | |
db: Database session | |
execute_function: The function to use for executing the tool | |
""" | |
# Create a dynamic handler function for this specific tool | |
async def tool_handler(**kwargs): | |
# Get user context (would need to be set elsewhere) | |
from fastapi import Request | |
request = kwargs.get("_request", None) | |
user = getattr(request, "user", None) | |
if not user: | |
return { | |
"isError": True, | |
"content": [{"type": "text", "text": "Authentication required"}] | |
} | |
# Execute the tool | |
return await execute_function(tool.id, kwargs, db, user) | |
# Register the tool with the MCP server | |
mcp_tool = convert_tool_to_mcp_format(tool) | |
mcp_server.register_tool( | |
name=mcp_tool.name, | |
description=mcp_tool.description, | |
parameters=mcp_tool.parameters, | |
handler=tool_handler | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# api/mcp/transports.py | |
from mcp.server.transports import StdioTransport, HttpTransport, WebsocketTransport | |
from fastapi import FastAPI, Depends, HTTPException, WebSocket | |
from sqlalchemy.orm import Session | |
from api import database | |
from api.mcp.auth import verify_mcp_token | |
from api.mcp.server import MCPAggregatorServer | |
import asyncio | |
def create_stdio_server(): | |
"""Create an MCP server with stdio transport for local usage.""" | |
db = next(database.get_db()) | |
server = MCPAggregatorServer(db).get_mcp_server() | |
# Use stdio transport | |
transport = StdioTransport() | |
server.serve(transport) | |
return server | |
def mount_to_fastapi(app: FastAPI): | |
"""Mount MCP transports to an existing FastAPI application.""" | |
# HTTP transport endpoint | |
@app.post("/api/v1/mcp-transport/http") | |
async def mcp_http_endpoint(request: dict, db: Session = Depends(database.get_db)): | |
"""HTTP transport endpoint for MCP.""" | |
# Get token from request header | |
token = request.headers.get("Authorization", "").replace("Bearer ", "") | |
user = verify_mcp_token(token, db) | |
# Create MCP server for this user | |
server = MCPAggregatorServer(db).get_mcp_server() | |
# Use HTTP transport | |
transport = HttpTransport() | |
return await server.handle_request(request, transport) | |
# WebSocket transport endpoint | |
@app.websocket("/api/v1/mcp-transport/ws") | |
async def mcp_websocket_endpoint(websocket: WebSocket, db: Session = Depends(database.get_db)): | |
"""WebSocket transport for MCP.""" | |
await websocket.accept() | |
# Get token from query parameter or initial message | |
token = websocket.query_params.get("token", "") | |
try: | |
user = verify_mcp_token(token, db) | |
# Create MCP server for this user | |
server = MCPAggregatorServer(db).get_mcp_server() | |
# Use WebSocket transport | |
transport = WebsocketTransport(websocket) | |
await server.serve(transport) | |
except HTTPException: | |
await websocket.close(code=1008) # Policy violation | |
except Exception as e: | |
await websocket.close(code=1011) # Internal error |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment