|
#!/usr/bin/env python3 |
|
""" |
|
Manim MCP Server - Mathematical animations through the Model Context Protocol |
|
Provides Manim animation creation capabilities for AI assistants. |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import logging |
|
import os |
|
import shutil |
|
import subprocess |
|
import tempfile |
|
import time |
|
from pathlib import Path |
|
from typing import Any, Dict, List, Optional |
|
from datetime import datetime |
|
|
|
import mcp.server.stdio |
|
import mcp.types as types |
|
from mcp.server import NotificationOptions, Server, InitializationOptions |
|
from pydantic import BaseModel |
|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.responses import JSONResponse, RedirectResponse, Response, StreamingResponse |
|
import uvicorn |
|
|
|
# Configure logging |
|
logging.basicConfig( |
|
level=os.getenv('LOG_LEVEL', 'INFO'), |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
# Tool models |
|
class ToolRequest(BaseModel): |
|
"""Model for tool execution requests""" |
|
tool: str |
|
arguments: Optional[Dict[str, Any]] = None |
|
parameters: Optional[Dict[str, Any]] = None |
|
client_id: Optional[str] = None |
|
|
|
def get_args(self) -> Dict[str, Any]: |
|
"""Get arguments, supporting both 'arguments' and 'parameters' fields""" |
|
return self.arguments or self.parameters or {} |
|
|
|
|
|
class ToolResponse(BaseModel): |
|
"""Model for tool execution responses""" |
|
success: bool |
|
result: Any |
|
error: Optional[str] = None |
|
|
|
|
|
def ensure_directory(path: str) -> str: |
|
"""Ensure directory exists and return the path""" |
|
os.makedirs(path, exist_ok=True) |
|
return path |
|
|
|
|
|
def setup_logging(name: str) -> logging.Logger: |
|
"""Setup logging for a component""" |
|
logger = logging.getLogger(name) |
|
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO')) |
|
return logger |
|
|
|
|
|
class ManimMCPServer: |
|
"""MCP Server for Manim mathematical animations""" |
|
|
|
def __init__(self, output_dir: str = "/app/output", port: int = 8011): |
|
self.name = "Manim MCP Server" |
|
self.version = "1.0.0" |
|
self.port = port |
|
self.logger = setup_logging("ManimMCP") |
|
self.app = FastAPI(title=self.name, version=self.version) |
|
|
|
# Use environment variable if set |
|
self.output_dir = os.environ.get("MCP_OUTPUT_DIR", output_dir) |
|
self.logger.info(f"Using output directory: {self.output_dir}") |
|
|
|
try: |
|
# Create output directory with error handling |
|
self.manim_output_dir = ensure_directory(os.path.join(self.output_dir, "animations")) |
|
self.logger.info("Successfully created output directory") |
|
except Exception as e: |
|
self.logger.error(f"Failed to create output directory: {e}") |
|
# Use temp directory as fallback |
|
temp_dir = tempfile.mkdtemp(prefix="mcp_manim_") |
|
self.output_dir = temp_dir |
|
self.manim_output_dir = ensure_directory(os.path.join(temp_dir, "animations")) |
|
self.logger.warning(f"Using fallback temp directory: {temp_dir}") |
|
|
|
self._setup_routes() |
|
self._setup_events() |
|
|
|
def _setup_events(self): |
|
"""Setup startup/shutdown events""" |
|
@self.app.on_event("startup") |
|
async def startup_event(): |
|
self.logger.info(f"{self.name} starting on port {self.port}") |
|
self.logger.info(f"Server version: {self.version}") |
|
self.logger.info("Server initialized successfully") |
|
|
|
def _setup_routes(self): |
|
"""Setup HTTP routes""" |
|
self.app.get("/health")(self.health_check) |
|
self.app.get("/mcp/tools")(self.list_tools) |
|
self.app.post("/mcp/execute")(self.execute_tool) |
|
self.app.post("/mcp/register")(self.register_client) |
|
self.app.post("/register")(self.register_client) |
|
self.app.get("/messages")(self.handle_messages_get) |
|
self.app.post("/messages")(self.handle_messages) |
|
|
|
async def health_check(self): |
|
"""Health check endpoint""" |
|
return {"status": "healthy", "server": self.name, "version": self.version} |
|
|
|
async def register_client(self, request: Dict[str, Any]): |
|
"""Register a client - simplified""" |
|
client_name = request.get("client", request.get("client_name", "unknown")) |
|
client_id = request.get("client_id", f"{client_name}_simple") |
|
|
|
self.logger.info(f"Client registration request from: {client_name}") |
|
|
|
return { |
|
"status": "registered", |
|
"client": client_name, |
|
"client_id": client_id, |
|
"server": self.name, |
|
"version": self.version, |
|
"registration": { |
|
"client_id": client_id, |
|
"client_name": client_name, |
|
"registered": True, |
|
"is_update": False, |
|
"registration_time": datetime.utcnow().isoformat(), |
|
"server_time": datetime.utcnow().isoformat(), |
|
}, |
|
} |
|
|
|
async def handle_messages_get(self, request: Request): |
|
"""Handle GET requests to /messages endpoint""" |
|
return { |
|
"protocol": "mcp", |
|
"version": "1.0", |
|
"server": { |
|
"name": self.name, |
|
"version": self.version, |
|
"description": f"{self.name} MCP Server", |
|
}, |
|
"auth": { |
|
"required": False, |
|
"type": "none", |
|
}, |
|
"transport": { |
|
"type": "streamable-http", |
|
"endpoint": "/messages", |
|
}, |
|
} |
|
|
|
async def handle_messages(self, request: Request): |
|
"""Handle POST requests to /messages endpoint""" |
|
try: |
|
body = await request.json() |
|
self.logger.info(f"Messages request body: {json.dumps(body)}") |
|
|
|
if isinstance(body, list): |
|
responses = [] |
|
for req in body: |
|
response = await self._process_jsonrpc_request(req) |
|
if response: |
|
responses.append(response) |
|
return JSONResponse(content=responses) |
|
else: |
|
response = await self._process_jsonrpc_request(body) |
|
if response is None: |
|
return Response(status_code=202) |
|
return JSONResponse(content=response) |
|
except Exception as e: |
|
self.logger.error(f"Messages endpoint error: {e}") |
|
return JSONResponse( |
|
content={ |
|
"jsonrpc": "2.0", |
|
"error": {"code": -32700, "message": "Parse error", "data": str(e)}, |
|
"id": None, |
|
}, |
|
status_code=400, |
|
) |
|
|
|
async def _process_jsonrpc_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
|
"""Process a single JSON-RPC request""" |
|
jsonrpc = request.get("jsonrpc", "2.0") |
|
method = request.get("method") |
|
params = request.get("params", {}) |
|
req_id = request.get("id") |
|
|
|
self.logger.info(f"JSON-RPC request: method={method}, id={req_id}") |
|
|
|
is_notification = req_id is None |
|
|
|
try: |
|
if method == "initialize": |
|
result = await self._jsonrpc_initialize(params) |
|
elif method == "initialized": |
|
self.logger.info("Client sent initialized notification") |
|
if is_notification: |
|
return None |
|
result = {"status": "acknowledged"} |
|
elif method == "tools/list": |
|
result = await self._jsonrpc_list_tools(params) |
|
elif method == "tools/call": |
|
result = await self._jsonrpc_call_tool(params) |
|
elif method == "ping": |
|
result = {"pong": True} |
|
else: |
|
if not is_notification: |
|
return { |
|
"jsonrpc": jsonrpc, |
|
"error": { |
|
"code": -32601, |
|
"message": f"Method not found: {method}", |
|
}, |
|
"id": req_id, |
|
} |
|
return None |
|
|
|
if not is_notification: |
|
return {"jsonrpc": jsonrpc, "result": result, "id": req_id} |
|
return None |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error processing method {method}: {e}") |
|
if not is_notification: |
|
return { |
|
"jsonrpc": jsonrpc, |
|
"error": { |
|
"code": -32603, |
|
"message": "Internal error", |
|
"data": str(e), |
|
}, |
|
"id": req_id, |
|
} |
|
return None |
|
|
|
async def _jsonrpc_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Handle initialize request""" |
|
client_info = params.get("clientInfo", {}) |
|
protocol_version = params.get("protocolVersion", "2024-11-05") |
|
|
|
self.logger.info(f"Client info: {client_info}, requested protocol: {protocol_version}") |
|
|
|
return { |
|
"protocolVersion": protocol_version, |
|
"serverInfo": {"name": self.name, "version": self.version}, |
|
"capabilities": { |
|
"tools": {"listChanged": True}, |
|
"resources": {}, |
|
"prompts": {}, |
|
}, |
|
} |
|
|
|
async def _jsonrpc_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Handle tools/list request""" |
|
tools = self.get_tools() |
|
self.logger.info(f"Available tools: {list(tools.keys())}") |
|
|
|
tool_list = [] |
|
for tool_name, tool_info in tools.items(): |
|
tool_list.append( |
|
{ |
|
"name": tool_name, |
|
"description": tool_info.get("description", ""), |
|
"inputSchema": tool_info.get("parameters", {}), |
|
} |
|
) |
|
|
|
return {"tools": tool_list} |
|
|
|
async def _jsonrpc_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Handle tools/call request""" |
|
tool_name = params.get("name") |
|
arguments = params.get("arguments", {}) |
|
|
|
if not tool_name: |
|
raise ValueError("Tool name is required") |
|
|
|
tools = self.get_tools() |
|
if tool_name not in tools: |
|
raise ValueError(f"Tool '{tool_name}' not found") |
|
|
|
tool_func = getattr(self, tool_name, None) |
|
if not tool_func: |
|
raise ValueError(f"Tool '{tool_name}' not implemented") |
|
|
|
try: |
|
result = await tool_func(**arguments) |
|
|
|
if isinstance(result, dict): |
|
content_text = json.dumps(result, indent=2) |
|
else: |
|
content_text = str(result) |
|
|
|
return {"content": [{"type": "text", "text": content_text}]} |
|
except Exception as e: |
|
self.logger.error(f"Error calling tool {tool_name}: {e}") |
|
return { |
|
"content": [{"type": "text", "text": f"Error executing {tool_name}: {str(e)}"}], |
|
"isError": True, |
|
} |
|
|
|
async def list_tools(self): |
|
"""List available tools""" |
|
tools = self.get_tools() |
|
return { |
|
"tools": [ |
|
{ |
|
"name": tool_name, |
|
"description": tool_info.get("description", ""), |
|
"parameters": tool_info.get("parameters", {}), |
|
} |
|
for tool_name, tool_info in tools.items() |
|
] |
|
} |
|
|
|
async def execute_tool(self, request: ToolRequest): |
|
"""Execute a tool with given arguments""" |
|
try: |
|
tools = self.get_tools() |
|
if request.tool not in tools: |
|
raise HTTPException(status_code=404, detail=f"Tool '{request.tool}' not found") |
|
|
|
tool_func = getattr(self, request.tool, None) |
|
if not tool_func: |
|
raise HTTPException(status_code=501, detail=f"Tool '{request.tool}' not implemented") |
|
|
|
result = await tool_func(**request.get_args()) |
|
|
|
return ToolResponse(success=True, result=result) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error executing tool {request.tool}: {str(e)}") |
|
return ToolResponse(success=False, result=None, error=str(e)) |
|
|
|
def get_tools(self) -> Dict[str, Dict[str, Any]]: |
|
"""Return available Manim tools""" |
|
return { |
|
"create_manim_animation": { |
|
"description": "Create mathematical animations using Manim", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"script": { |
|
"type": "string", |
|
"description": "Python script for Manim animation", |
|
}, |
|
"output_format": { |
|
"type": "string", |
|
"enum": ["mp4", "gif", "png", "webm"], |
|
"default": "mp4", |
|
"description": "Output format for the animation", |
|
}, |
|
"quality": { |
|
"type": "string", |
|
"enum": ["low", "medium", "high", "fourk"], |
|
"default": "medium", |
|
"description": "Rendering quality", |
|
}, |
|
"preview": { |
|
"type": "boolean", |
|
"default": False, |
|
"description": "Generate preview frame instead of full animation", |
|
}, |
|
}, |
|
"required": ["script"], |
|
}, |
|
}, |
|
} |
|
|
|
async def create_manim_animation( |
|
self, |
|
script: str, |
|
output_format: str = "mp4", |
|
quality: str = "medium", |
|
preview: bool = False, |
|
) -> Dict[str, Any]: |
|
"""Create Manim animation from Python script |
|
|
|
Args: |
|
script: Python script containing Manim scene |
|
output_format: Output format (mp4, gif, png, webm) |
|
quality: Rendering quality (low, medium, high, fourk) |
|
preview: Generate preview frame only |
|
|
|
Returns: |
|
Dictionary with animation file path and metadata |
|
""" |
|
try: |
|
# Create temporary file for script |
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: |
|
f.write(script) |
|
script_path = f.name |
|
|
|
# Build Manim command |
|
quality_flags = { |
|
"low": "-ql", |
|
"medium": "-qm", |
|
"high": "-qh", |
|
"fourk": "-qk", |
|
} |
|
|
|
cmd = [ |
|
"manim", |
|
"render", |
|
"--media_dir", |
|
self.manim_output_dir, |
|
quality_flags.get(quality, "-qm"), |
|
"--format", |
|
output_format, |
|
] |
|
|
|
if preview: |
|
cmd.append("-s") # Save last frame |
|
|
|
cmd.append(script_path) |
|
|
|
self.logger.info(f"Running Manim: {' '.join(cmd)}") |
|
|
|
# Run Manim |
|
result = subprocess.run(cmd, capture_output=True, text=True, check=False) |
|
|
|
# Clean up script file |
|
os.unlink(script_path) |
|
|
|
if result.returncode == 0: |
|
# Find output file - check both media and videos directories |
|
for search_dir in ["media", "videos", ""]: |
|
search_path = os.path.join(self.manim_output_dir, search_dir) if search_dir else self.manim_output_dir |
|
if os.path.exists(search_path): |
|
# Search for output file |
|
for root, dirs, files in os.walk(search_path): |
|
for file in files: |
|
if file.endswith(f".{output_format}") and "partial_movie_files" not in root: |
|
output_path = os.path.join(root, file) |
|
# Copy to a stable location |
|
final_path = os.path.join( |
|
self.manim_output_dir, |
|
f"animation_{os.getpid()}.{output_format}", |
|
) |
|
shutil.copy(output_path, final_path) |
|
|
|
return { |
|
"success": True, |
|
"output_path": final_path, |
|
"format": output_format, |
|
"quality": quality, |
|
"preview": preview, |
|
} |
|
|
|
return { |
|
"success": False, |
|
"error": "Output file not found after rendering", |
|
} |
|
else: |
|
return { |
|
"success": False, |
|
"error": result.stderr or "Animation creation failed", |
|
"stdout": result.stdout, |
|
} |
|
|
|
except FileNotFoundError: |
|
return { |
|
"success": False, |
|
"error": "Manim not found. Please install it first.", |
|
} |
|
except Exception as e: |
|
self.logger.error(f"Manim error: {str(e)}") |
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
async def run_stdio(self): |
|
"""Run the server in stdio mode (for Claude desktop app)""" |
|
server = Server(self.name) |
|
|
|
# Store tools and their functions for later access |
|
self._tools = self.get_tools() |
|
self._tool_funcs = {} |
|
for tool_name, tool_info in self._tools.items(): |
|
tool_func = getattr(self, tool_name, None) |
|
if tool_func: |
|
self._tool_funcs[tool_name] = tool_func |
|
|
|
@server.list_tools() |
|
async def list_tools() -> List[types.Tool]: |
|
"""List available tools""" |
|
tools = [] |
|
for tool_name, tool_info in self._tools.items(): |
|
tools.append( |
|
types.Tool( |
|
name=tool_name, |
|
description=tool_info.get("description", ""), |
|
inputSchema=tool_info.get("parameters", {}), |
|
) |
|
) |
|
return tools |
|
|
|
@server.call_tool() |
|
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: |
|
"""Call a tool with given arguments""" |
|
if name not in self._tool_funcs: |
|
return [types.TextContent(type="text", text=f"Tool '{name}' not found")] |
|
|
|
try: |
|
# Call the tool function |
|
result = await self._tool_funcs[name](**arguments) |
|
|
|
# Convert result to MCP response format |
|
if isinstance(result, dict): |
|
return [types.TextContent(type="text", text=json.dumps(result, indent=2))] |
|
return [types.TextContent(type="text", text=str(result))] |
|
except Exception as e: |
|
self.logger.error(f"Error calling tool {name}: {str(e)}") |
|
return [types.TextContent(type="text", text=f"Error: {str(e)}")] |
|
|
|
# Run the stdio server |
|
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): |
|
await server.run( |
|
read_stream, |
|
write_stream, |
|
InitializationOptions( |
|
server_name=self.name, |
|
server_version=self.version, |
|
capabilities=server.get_capabilities( |
|
notification_options=NotificationOptions(), |
|
experimental_capabilities={}, |
|
), |
|
), |
|
) |
|
|
|
def run_http(self): |
|
"""Run the server in HTTP mode""" |
|
uvicorn.run(self.app, host="0.0.0.0", port=self.port) |
|
|
|
def run(self, mode: str = "http"): |
|
"""Run the server in specified mode""" |
|
if mode == "stdio": |
|
asyncio.run(self.run_stdio()) |
|
elif mode == "http": |
|
self.run_http() |
|
else: |
|
raise ValueError(f"Unknown mode: {mode}. Use 'stdio' or 'http'.") |
|
|
|
|
|
def main(): |
|
"""Run the Manim MCP Server""" |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(description="Manim MCP Server") |
|
parser.add_argument( |
|
"--mode", |
|
choices=["stdio", "http"], |
|
default="stdio", |
|
help="Server mode (http or stdio)", |
|
) |
|
parser.add_argument( |
|
"--port", |
|
type=int, |
|
default=8011, |
|
help="Port to run the server on (HTTP mode only)", |
|
) |
|
parser.add_argument( |
|
"--output-dir", |
|
default=os.environ.get("MCP_OUTPUT_DIR", "/app/output"), |
|
help="Output directory for animations", |
|
) |
|
args = parser.parse_args() |
|
|
|
server = ManimMCPServer(output_dir=args.output_dir, port=args.port) |
|
server.run(mode=args.mode) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
This is awesome. How do I set up the MCP server with Claude Code?