|
#!/usr/bin/env python3 |
|
""" |
|
Gemini AI Integration MCP Server |
|
Provides development workflow automation with AI second opinions |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import logging |
|
import os |
|
import sys |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Any, Dict, List, Optional |
|
|
|
import mcp.server.stdio |
|
import mcp.types as types |
|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.responses import JSONResponse, RedirectResponse, Response |
|
from mcp.server import InitializationOptions, NotificationOptions, Server |
|
from pydantic import BaseModel |
|
|
|
# Setup logging |
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def check_container_environment(): |
|
"""Check if running in a container""" |
|
if os.path.exists("/.dockerenv") or os.environ.get("CONTAINER_ENV"): |
|
return True |
|
return False |
|
|
|
|
|
def setup_logging(name: str): |
|
"""Setup logging for the server""" |
|
logger = logging.getLogger(name) |
|
logger.setLevel(logging.INFO) |
|
|
|
# Create console handler with formatting |
|
ch = logging.StreamHandler() |
|
ch.setLevel(logging.INFO) |
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
ch.setFormatter(formatter) |
|
|
|
# Add handler to logger |
|
logger.addHandler(ch) |
|
|
|
return logger |
|
|
|
|
|
class ToolRequest(BaseModel): |
|
"""Model for tool execution requests""" |
|
tool: str |
|
arguments: Optional[Dict[str, Any]] = None |
|
parameters: Optional[Dict[str, Any]] = None |
|
client_id: Optional[str] = None |
|
|
|
def get_args(self) -> Dict[str, Any]: |
|
"""Get arguments, supporting both 'arguments' and 'parameters' fields""" |
|
return self.arguments or self.parameters or {} |
|
|
|
|
|
class ToolResponse(BaseModel): |
|
"""Model for tool execution responses""" |
|
success: bool |
|
result: Any |
|
error: Optional[str] = None |
|
|
|
|
|
class GeminiMCPServer: |
|
"""MCP Server for Gemini AI integration and consultation""" |
|
|
|
def __init__(self, project_root: Optional[str] = None): |
|
# Check if running in container and exit if true |
|
if check_container_environment(): |
|
print( |
|
"ERROR: Gemini MCP Server cannot run inside a container!", |
|
file=sys.stderr, |
|
) |
|
print( |
|
"The Gemini CLI requires Docker access and must run on the host system.", |
|
file=sys.stderr, |
|
) |
|
print("Please launch this server directly on the host with:", file=sys.stderr) |
|
print(" python gemini_mcp_server.py", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
# Initialize base server attributes |
|
self.name = "Gemini MCP Server" |
|
self.version = "1.0.0" |
|
self.port = 8006 # Standard Gemini MCP port |
|
self.logger = setup_logging("GeminiMCP") |
|
self.app = FastAPI(title=self.name, version=self.version) |
|
self._setup_routes() |
|
self._setup_events() |
|
|
|
self.project_root = Path(project_root) if project_root else Path.cwd() |
|
|
|
# Initialize Gemini integration |
|
self.gemini_config = self._load_gemini_config() |
|
self.gemini = self._initialize_gemini() |
|
|
|
# Track uncertainty for auto-consultation |
|
self.last_response_uncertainty = None |
|
|
|
def _setup_events(self): |
|
"""Setup startup/shutdown events""" |
|
@self.app.on_event("startup") |
|
async def startup_event(): |
|
self.logger.info(f"{self.name} starting on port {self.port}") |
|
self.logger.info(f"Server version: {self.version}") |
|
self.logger.info("Server initialized successfully") |
|
|
|
def _setup_routes(self): |
|
"""Setup common HTTP routes""" |
|
self.app.get("/health")(self.health_check) |
|
self.app.get("/mcp/tools")(self.list_tools) |
|
self.app.post("/mcp/execute")(self.execute_tool) |
|
self.app.post("/mcp/register")(self.register_client) |
|
self.app.post("/register")(self.register_client_oauth) |
|
self.app.post("/oauth/register")(self.register_client_oauth) |
|
self.app.get("/authorize")(self.oauth_authorize_bypass) |
|
self.app.post("/authorize")(self.oauth_authorize_bypass) |
|
self.app.get("/oauth/authorize")(self.oauth_authorize_bypass) |
|
self.app.post("/oauth/authorize")(self.oauth_authorize_bypass) |
|
self.app.post("/token")(self.oauth_token_bypass) |
|
self.app.post("/oauth/token")(self.oauth_token_bypass) |
|
self.app.get("/mcp/clients")(self.list_clients) |
|
self.app.get("/mcp/clients/{client_id}")(self.get_client_info) |
|
self.app.get("/mcp/stats")(self.get_stats) |
|
self.app.get("/.well-known/oauth-authorization-server")(self.oauth_discovery) |
|
self.app.get("/.well-known/oauth-authorization-server/mcp")(self.oauth_discovery) |
|
self.app.get("/.well-known/oauth-authorization-server/messages")(self.oauth_discovery) |
|
self.app.get("/.well-known/oauth-protected-resource")(self.oauth_protected_resource) |
|
self.app.get("/.well-known/mcp")(self.mcp_discovery) |
|
self.app.post("/mcp/initialize")(self.mcp_initialize) |
|
self.app.get("/mcp/capabilities")(self.mcp_capabilities) |
|
self.app.get("/messages")(self.handle_messages_get) |
|
self.app.post("/messages")(self.handle_messages) |
|
self.app.get("/mcp")(self.handle_mcp_get) |
|
self.app.post("/mcp")(self.handle_jsonrpc) |
|
self.app.options("/mcp")(self.handle_options) |
|
self.app.post("/mcp/rpc")(self.handle_jsonrpc) |
|
self.app.get("/mcp/sse")(self.handle_mcp_sse) |
|
|
|
async def health_check(self): |
|
"""Health check endpoint""" |
|
return {"status": "healthy", "server": self.name, "version": self.version} |
|
|
|
async def register_client(self, request: Dict[str, Any]): |
|
"""Register a client - simplified for home lab use""" |
|
client_name = request.get("client", request.get("client_name", "unknown")) |
|
client_id = request.get("client_id", f"{client_name}_simple") |
|
|
|
self.logger.info(f"Client registration request from: {client_name}") |
|
|
|
return { |
|
"status": "registered", |
|
"client": client_name, |
|
"client_id": client_id, |
|
"server": self.name, |
|
"version": self.version, |
|
"registration": { |
|
"client_id": client_id, |
|
"client_name": client_name, |
|
"registered": True, |
|
"is_update": False, |
|
"registration_time": datetime.utcnow().isoformat(), |
|
"server_time": datetime.utcnow().isoformat(), |
|
}, |
|
} |
|
|
|
async def register_client_oauth(self, request_data: Dict[str, Any], request: Request): |
|
"""OAuth2-style client registration - simplified for home lab use""" |
|
redirect_uris = request_data.get("redirect_uris", []) |
|
client_name = request_data.get("client_name", request_data.get("client", "claude-code")) |
|
client_id = f"{client_name}_oauth" |
|
|
|
self.logger.info(f"OAuth registration request from: {client_name}") |
|
|
|
return { |
|
"client_id": client_id, |
|
"client_name": client_name, |
|
"redirect_uris": redirect_uris if redirect_uris else ["http://localhost"], |
|
"grant_types": request_data.get("grant_types", ["authorization_code"]), |
|
"response_types": request_data.get("response_types", ["code"]), |
|
"token_endpoint_auth_method": request_data.get("token_endpoint_auth_method", "none"), |
|
"registration_access_token": "not-required-for-local-mcp", |
|
"registration_client_uri": f"{request.url.scheme}://{request.url.netloc}/mcp/clients/{client_id}", |
|
"client_id_issued_at": int(datetime.utcnow().timestamp()), |
|
"client_secret_expires_at": 0, |
|
} |
|
|
|
async def oauth_authorize_bypass(self, request: Request): |
|
"""Bypass OAuth2 authorization - immediately approve without auth""" |
|
params = dict(request.query_params) |
|
redirect_uri = params.get("redirect_uri", "http://localhost") |
|
state = params.get("state", "") |
|
|
|
auth_code = "bypass-auth-code-no-auth-required" |
|
|
|
separator = "&" if "?" in redirect_uri else "?" |
|
redirect_url = f"{redirect_uri}{separator}code={auth_code}" |
|
if state: |
|
redirect_url += f"&state={state}" |
|
|
|
return RedirectResponse(url=redirect_url, status_code=302) |
|
|
|
async def oauth_token_bypass(self, request: Request): |
|
"""Bypass OAuth2 token exchange - immediately return access token""" |
|
try: |
|
if request.headers.get("content-type", "").startswith("application/json"): |
|
request_data = await request.json() |
|
else: |
|
form_data = await request.form() |
|
request_data = dict(form_data) |
|
except Exception: |
|
request_data = {} |
|
|
|
self.logger.info(f"Token request data: {request_data}") |
|
|
|
return { |
|
"access_token": "bypass-token-no-auth-required", |
|
"token_type": "Bearer", |
|
"expires_in": 31536000, |
|
"scope": "full_access", |
|
"refresh_token": "bypass-refresh-token-no-auth-required", |
|
} |
|
|
|
async def oauth_discovery(self, request: Request): |
|
"""OAuth 2.0 authorization server metadata""" |
|
base_url = f"{request.url.scheme}://{request.url.netloc}" |
|
return { |
|
"issuer": base_url, |
|
"authorization_endpoint": f"{base_url}/authorize", |
|
"token_endpoint": f"{base_url}/token", |
|
"registration_endpoint": f"{base_url}/register", |
|
"token_endpoint_auth_methods_supported": ["none"], |
|
"response_types_supported": ["code"], |
|
"grant_types_supported": ["authorization_code"], |
|
"code_challenge_methods_supported": ["S256"], |
|
"registration_endpoint_auth_methods_supported": ["none"], |
|
} |
|
|
|
async def oauth_protected_resource(self, request: Request): |
|
"""OAuth 2.0 protected resource metadata""" |
|
base_url = f"{request.url.scheme}://{request.url.netloc}" |
|
return { |
|
"resource": f"{base_url}/messages", |
|
"authorization_servers": [base_url], |
|
} |
|
|
|
async def handle_mcp_get(self, request: Request): |
|
"""Handle GET requests to /mcp endpoint for SSE streaming""" |
|
import uuid |
|
from fastapi.responses import StreamingResponse |
|
|
|
session_id = request.headers.get("Mcp-Session-Id", str(uuid.uuid4())) |
|
|
|
async def event_generator(): |
|
connection_data = { |
|
"type": "connection", |
|
"sessionId": session_id, |
|
"status": "connected", |
|
} |
|
yield f"data: {json.dumps(connection_data)}\n\n" |
|
|
|
while True: |
|
await asyncio.sleep(15) |
|
ping_data = {"type": "ping", "timestamp": datetime.utcnow().isoformat()} |
|
yield f"data: {json.dumps(ping_data)}\n\n" |
|
|
|
return StreamingResponse( |
|
event_generator(), |
|
media_type="text/event-stream", |
|
headers={ |
|
"Cache-Control": "no-cache", |
|
"Connection": "keep-alive", |
|
"X-Accel-Buffering": "no", |
|
"Mcp-Session-Id": session_id, |
|
}, |
|
) |
|
|
|
async def handle_mcp_sse(self, request: Request): |
|
"""Handle SSE requests for authenticated clients""" |
|
from fastapi.responses import StreamingResponse |
|
|
|
auth_header = request.headers.get("authorization", "") |
|
if not auth_header.startswith("Bearer "): |
|
raise HTTPException(status_code=401, detail="Unauthorized") |
|
|
|
async def event_generator(): |
|
yield f"data: {json.dumps({'type': 'connected', 'message': 'SSE connection established'})}\n\n" |
|
|
|
while True: |
|
await asyncio.sleep(30) |
|
yield f"data: {json.dumps({'type': 'ping'})}\n\n" |
|
|
|
return StreamingResponse( |
|
event_generator(), |
|
media_type="text/event-stream", |
|
headers={ |
|
"Cache-Control": "no-cache", |
|
"Connection": "keep-alive", |
|
"X-Accel-Buffering": "no", |
|
}, |
|
) |
|
|
|
async def handle_messages_get(self, request: Request): |
|
"""Handle GET requests to /messages endpoint""" |
|
return { |
|
"protocol": "mcp", |
|
"version": "1.0", |
|
"server": { |
|
"name": self.name, |
|
"version": self.version, |
|
"description": f"{self.name} MCP Server", |
|
}, |
|
"auth": { |
|
"required": False, |
|
"type": "none", |
|
}, |
|
"transport": { |
|
"type": "streamable-http", |
|
"endpoint": "/messages", |
|
}, |
|
} |
|
|
|
async def handle_messages(self, request: Request): |
|
"""Handle POST requests to /messages endpoint (HTTP Stream Transport)""" |
|
session_id = request.headers.get("Mcp-Session-Id") |
|
response_mode = request.headers.get("Mcp-Response-Mode", "batch").lower() |
|
protocol_version = request.headers.get("MCP-Protocol-Version") |
|
|
|
self.logger.info(f"Messages request headers: {dict(request.headers)}") |
|
self.logger.info(f"Session ID: {session_id}, Response Mode: {response_mode}, Protocol Version: {protocol_version}") |
|
|
|
try: |
|
body = await request.json() |
|
self.logger.info(f"Messages request body: {json.dumps(body)}") |
|
|
|
is_init_request = False |
|
if isinstance(body, dict) and body.get("method") == "initialize": |
|
is_init_request = True |
|
if not session_id: |
|
import uuid |
|
session_id = str(uuid.uuid4()) |
|
self.logger.info(f"Generated new session ID: {session_id}") |
|
|
|
if response_mode == "stream": |
|
from fastapi.responses import StreamingResponse |
|
|
|
async def event_generator(): |
|
if session_id: |
|
yield f"data: {json.dumps({'type': 'session', 'sessionId': session_id})}\n\n" |
|
|
|
if isinstance(body, list): |
|
for req in body: |
|
response = await self._process_jsonrpc_request(req) |
|
if response: |
|
yield f"data: {json.dumps(response)}\n\n" |
|
else: |
|
response = await self._process_jsonrpc_request(body) |
|
if response: |
|
yield f"data: {json.dumps(response)}\n\n" |
|
|
|
yield f"data: {json.dumps({'type': 'completion'})}\n\n" |
|
|
|
return StreamingResponse( |
|
event_generator(), |
|
media_type="text/event-stream", |
|
headers={ |
|
"Cache-Control": "no-cache", |
|
"Connection": "keep-alive", |
|
"X-Accel-Buffering": "no", |
|
"Mcp-Session-Id": session_id or "", |
|
}, |
|
) |
|
else: |
|
if isinstance(body, list): |
|
responses = [] |
|
has_notifications = False |
|
for req in body: |
|
response = await self._process_jsonrpc_request(req) |
|
if response is None: |
|
has_notifications = True |
|
else: |
|
responses.append(response) |
|
|
|
if not responses and has_notifications: |
|
return Response( |
|
status_code=202, |
|
headers={ |
|
"Mcp-Session-Id": session_id or "", |
|
}, |
|
) |
|
|
|
return JSONResponse( |
|
content=responses, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"Mcp-Session-Id": session_id or "", |
|
}, |
|
) |
|
else: |
|
response = await self._process_jsonrpc_request(body) |
|
if response is None: |
|
return Response( |
|
status_code=202, |
|
headers={ |
|
"Mcp-Session-Id": session_id or "", |
|
}, |
|
) |
|
else: |
|
if is_init_request and session_id: |
|
self.logger.info(f"Returning session ID in response: {session_id}") |
|
|
|
return JSONResponse( |
|
content=response, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"Mcp-Session-Id": session_id or "", |
|
}, |
|
) |
|
except Exception as e: |
|
self.logger.error(f"Messages endpoint error: {e}") |
|
return JSONResponse( |
|
content={ |
|
"jsonrpc": "2.0", |
|
"error": {"code": -32700, "message": "Parse error", "data": str(e)}, |
|
"id": None, |
|
}, |
|
status_code=400, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"Mcp-Session-Id": session_id or "", |
|
}, |
|
) |
|
|
|
async def handle_jsonrpc(self, request: Request): |
|
"""Handle JSON-RPC 2.0 requests for MCP protocol""" |
|
return await self.handle_messages(request) |
|
|
|
async def handle_options(self, request: Request): |
|
"""Handle OPTIONS requests for CORS preflight""" |
|
return Response( |
|
content="", |
|
headers={ |
|
"Access-Control-Allow-Origin": "*", |
|
"Access-Control-Allow-Methods": "GET, POST, OPTIONS", |
|
"Access-Control-Allow-Headers": "Content-Type, Authorization, Mcp-Session-Id, Mcp-Response-Mode", |
|
"Access-Control-Max-Age": "86400", |
|
}, |
|
) |
|
|
|
async def _process_jsonrpc_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
|
"""Process a single JSON-RPC request""" |
|
jsonrpc = request.get("jsonrpc", "2.0") |
|
method = request.get("method") |
|
params = request.get("params", {}) |
|
req_id = request.get("id") |
|
|
|
self.logger.info(f"JSON-RPC request: method={method}, id={req_id}") |
|
|
|
is_notification = req_id is None |
|
|
|
try: |
|
if method == "initialize": |
|
result = await self._jsonrpc_initialize(params) |
|
elif method == "initialized": |
|
self.logger.info("Client sent initialized notification") |
|
if is_notification: |
|
return None |
|
result = {"status": "acknowledged"} |
|
elif method == "tools/list": |
|
result = await self._jsonrpc_list_tools(params) |
|
elif method == "tools/call": |
|
result = await self._jsonrpc_call_tool(params) |
|
elif method == "completion/complete": |
|
result = {"error": "Completions not supported"} |
|
elif method == "ping": |
|
result = {"pong": True} |
|
else: |
|
if not is_notification: |
|
return { |
|
"jsonrpc": jsonrpc, |
|
"error": { |
|
"code": -32601, |
|
"message": f"Method not found: {method}", |
|
}, |
|
"id": req_id, |
|
} |
|
return None |
|
|
|
if not is_notification: |
|
response = {"jsonrpc": jsonrpc, "result": result, "id": req_id} |
|
self.logger.info(f"JSON-RPC response: {json.dumps(response)}") |
|
|
|
if method == "initialize" and "protocolVersion" in result: |
|
self.logger.info("Initialization complete, ready for tools/list request") |
|
self.logger.info("Expecting client to send 'tools/list' request next") |
|
|
|
return response |
|
return None |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error processing method {method}: {e}") |
|
if not is_notification: |
|
return { |
|
"jsonrpc": jsonrpc, |
|
"error": { |
|
"code": -32603, |
|
"message": "Internal error", |
|
"data": str(e), |
|
}, |
|
"id": req_id, |
|
} |
|
return None |
|
|
|
async def _jsonrpc_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Handle initialize request""" |
|
client_info = params.get("clientInfo", {}) |
|
protocol_version = params.get("protocolVersion", "2024-11-05") |
|
|
|
self.logger.info(f"Client info: {client_info}, requested protocol: {protocol_version}") |
|
|
|
self._protocol_version = protocol_version |
|
|
|
return { |
|
"protocolVersion": protocol_version, |
|
"serverInfo": {"name": self.name, "version": self.version}, |
|
"capabilities": { |
|
"tools": {"listChanged": True}, |
|
"resources": {}, |
|
"prompts": {}, |
|
}, |
|
} |
|
|
|
async def _jsonrpc_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Handle tools/list request""" |
|
tools = self.get_tools() |
|
self.logger.info(f"Available tools from get_tools(): {list(tools.keys())}") |
|
|
|
tool_list = [] |
|
|
|
for tool_name, tool_info in tools.items(): |
|
tool_list.append( |
|
{ |
|
"name": tool_name, |
|
"description": tool_info.get("description", ""), |
|
"inputSchema": tool_info.get("parameters", {}), |
|
} |
|
) |
|
|
|
self.logger.info(f"Returning {len(tool_list)} tools to client") |
|
return {"tools": tool_list} |
|
|
|
async def _jsonrpc_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Handle tools/call request""" |
|
tool_name = params.get("name") |
|
arguments = params.get("arguments", {}) |
|
|
|
if not tool_name: |
|
raise ValueError("Tool name is required") |
|
|
|
tools = self.get_tools() |
|
if tool_name not in tools: |
|
raise ValueError(f"Tool '{tool_name}' not found") |
|
|
|
tool_func = getattr(self, tool_name, None) |
|
if not tool_func: |
|
raise ValueError(f"Tool '{tool_name}' not implemented") |
|
|
|
try: |
|
result = await tool_func(**arguments) |
|
|
|
if isinstance(result, dict): |
|
content_text = json.dumps(result, indent=2) |
|
else: |
|
content_text = str(result) |
|
|
|
return {"content": [{"type": "text", "text": content_text}]} |
|
except Exception as e: |
|
self.logger.error(f"Error calling tool {tool_name}: {e}") |
|
return { |
|
"content": [{"type": "text", "text": f"Error executing {tool_name}: {str(e)}"}], |
|
"isError": True, |
|
} |
|
|
|
async def mcp_discovery(self): |
|
"""MCP protocol discovery endpoint""" |
|
return { |
|
"mcp_version": "1.0", |
|
"server_name": self.name, |
|
"server_version": self.version, |
|
"capabilities": { |
|
"tools": True, |
|
"prompts": False, |
|
"resources": False, |
|
}, |
|
"endpoints": { |
|
"tools": "/mcp/tools", |
|
"execute": "/mcp/execute", |
|
"initialize": "/mcp/initialize", |
|
"capabilities": "/mcp/capabilities", |
|
}, |
|
} |
|
|
|
async def mcp_info(self): |
|
"""MCP server information""" |
|
return { |
|
"protocol": "mcp", |
|
"version": "1.0", |
|
"server": { |
|
"name": self.name, |
|
"version": self.version, |
|
"description": f"{self.name} MCP Server", |
|
}, |
|
"auth": { |
|
"required": False, |
|
"type": "none", |
|
}, |
|
} |
|
|
|
async def mcp_initialize(self, request: Dict[str, Any]): |
|
"""Initialize MCP session""" |
|
client_info = request.get("client", {}) |
|
return { |
|
"session_id": f"session-{client_info.get('name', 'unknown')}-{int(datetime.utcnow().timestamp())}", |
|
"server": { |
|
"name": self.name, |
|
"version": self.version, |
|
}, |
|
"capabilities": { |
|
"tools": True, |
|
"prompts": False, |
|
"resources": False, |
|
}, |
|
} |
|
|
|
async def mcp_capabilities(self): |
|
"""Return server capabilities""" |
|
tools = self.get_tools() |
|
return { |
|
"capabilities": { |
|
"tools": { |
|
"list": list(tools.keys()), |
|
"count": len(tools), |
|
}, |
|
"prompts": { |
|
"supported": False, |
|
}, |
|
"resources": { |
|
"supported": False, |
|
}, |
|
}, |
|
} |
|
|
|
async def list_tools(self): |
|
"""List available tools""" |
|
tools = self.get_tools() |
|
return { |
|
"tools": [ |
|
{ |
|
"name": tool_name, |
|
"description": tool_info.get("description", ""), |
|
"parameters": tool_info.get("parameters", {}), |
|
} |
|
for tool_name, tool_info in tools.items() |
|
] |
|
} |
|
|
|
async def execute_tool(self, request: ToolRequest): |
|
"""Execute a tool with given arguments""" |
|
try: |
|
tools = self.get_tools() |
|
if request.tool not in tools: |
|
raise HTTPException(status_code=404, detail=f"Tool '{request.tool}' not found") |
|
|
|
tool_func = getattr(self, request.tool, None) |
|
if not tool_func: |
|
raise HTTPException(status_code=501, detail=f"Tool '{request.tool}' not implemented") |
|
|
|
result = await tool_func(**request.get_args()) |
|
|
|
return ToolResponse(success=True, result=result) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error executing tool {request.tool}: {str(e)}") |
|
return ToolResponse(success=False, result=None, error=str(e)) |
|
|
|
async def list_clients(self, active_only: bool = True): |
|
"""List clients - returns empty for home lab use""" |
|
return {"clients": [], "count": 0, "active_only": active_only} |
|
|
|
async def get_client_info(self, client_id: str): |
|
"""Get client info - returns simple response for home lab use""" |
|
return { |
|
"client_id": client_id, |
|
"client_name": client_id.replace("_oauth", "").replace("_simple", ""), |
|
"active": True, |
|
"registered_at": datetime.utcnow().isoformat(), |
|
} |
|
|
|
async def get_stats(self): |
|
"""Get server statistics - simplified for home lab use""" |
|
return { |
|
"server": { |
|
"name": self.name, |
|
"version": self.version, |
|
"tools_count": len(self.get_tools()), |
|
}, |
|
"clients": { |
|
"total_clients": 0, |
|
"active_clients": 0, |
|
"inactive_clients": 0, |
|
"clients_active_last_hour": 0, |
|
"total_requests": 0, |
|
}, |
|
} |
|
|
|
def _load_gemini_config(self) -> Dict[str, Any]: |
|
"""Load Gemini configuration from environment or config file""" |
|
# Try to load .env file if it exists |
|
env_file = self.project_root / ".env" |
|
if env_file.exists(): |
|
try: |
|
with open(env_file, "r") as f: |
|
for line in f: |
|
line = line.strip() |
|
if line and not line.startswith("#") and "=" in line: |
|
key, value = line.split("=", 1) |
|
# Only set if not already in environment |
|
if key not in os.environ: |
|
os.environ[key] = value |
|
except Exception as e: |
|
self.logger.warning(f"Could not load .env file: {e}") |
|
|
|
config = { |
|
"enabled": os.getenv("GEMINI_ENABLED", "true").lower() == "true", |
|
"auto_consult": os.getenv("GEMINI_AUTO_CONSULT", "true").lower() == "true", |
|
"cli_command": os.getenv("GEMINI_CLI_COMMAND", "gemini"), |
|
"timeout": int(os.getenv("GEMINI_TIMEOUT", "60")), |
|
"rate_limit_delay": float(os.getenv("GEMINI_RATE_LIMIT", "2")), |
|
"max_context_length": int(os.getenv("GEMINI_MAX_CONTEXT", "4000")), |
|
"log_consultations": os.getenv("GEMINI_LOG_CONSULTATIONS", "true").lower() == "true", |
|
"model": os.getenv("GEMINI_MODEL", "gemini-2.5-flash"), |
|
"sandbox_mode": os.getenv("GEMINI_SANDBOX", "false").lower() == "true", |
|
"debug_mode": os.getenv("GEMINI_DEBUG", "false").lower() == "true", |
|
"include_history": os.getenv("GEMINI_INCLUDE_HISTORY", "true").lower() == "true", |
|
"max_history_entries": int(os.getenv("GEMINI_MAX_HISTORY", "10")), |
|
} |
|
|
|
# Try to load from config file |
|
config_file = self.project_root / "gemini-config.json" |
|
if config_file.exists(): |
|
try: |
|
with open(config_file, "r") as f: |
|
file_config = json.load(f) |
|
config.update(file_config) |
|
except Exception as e: |
|
self.logger.warning(f"Could not load gemini-config.json: {e}") |
|
|
|
return config |
|
|
|
def _initialize_gemini(self): |
|
"""Initialize Gemini integration with lazy loading""" |
|
try: |
|
from gemini_integration import get_integration |
|
|
|
return get_integration(self.gemini_config) |
|
except ImportError as e: |
|
self.logger.error(f"Failed to import Gemini integration: {e}") |
|
|
|
# Return a mock object that always returns disabled status |
|
class MockGemini: |
|
def __init__(self): |
|
self.auto_consult = False |
|
self.enabled = False |
|
|
|
async def consult_gemini(self, **kwargs): |
|
return { |
|
"status": "disabled", |
|
"error": "Gemini integration not available", |
|
} |
|
|
|
def clear_conversation_history(self): |
|
return {"message": "Gemini integration not available"} |
|
|
|
def get_statistics(self): |
|
return {} |
|
|
|
return MockGemini() |
|
|
|
def get_tools(self) -> Dict[str, Dict[str, Any]]: |
|
"""Return available Gemini tools""" |
|
return { |
|
"consult_gemini": { |
|
"description": "Consult Gemini AI for a second opinion or validation", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "The question or code to consult Gemini about", |
|
}, |
|
"context": { |
|
"type": "string", |
|
"description": "Additional context for the consultation", |
|
}, |
|
"comparison_mode": { |
|
"type": "boolean", |
|
"default": True, |
|
"description": "Compare with previous Claude response", |
|
}, |
|
"force": { |
|
"type": "boolean", |
|
"default": False, |
|
"description": "Force consultation even if disabled", |
|
}, |
|
}, |
|
"required": ["query"], |
|
}, |
|
}, |
|
"clear_gemini_history": { |
|
"description": "Clear Gemini conversation history", |
|
"parameters": {"type": "object", "properties": {}}, |
|
}, |
|
"gemini_status": { |
|
"description": "Get Gemini integration status and statistics", |
|
"parameters": {"type": "object", "properties": {}}, |
|
}, |
|
"toggle_gemini_auto_consult": { |
|
"description": "Toggle automatic Gemini consultation on uncertainty detection", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"enable": { |
|
"type": "boolean", |
|
"description": "Enable or disable auto-consultation", |
|
} |
|
}, |
|
}, |
|
}, |
|
} |
|
|
|
async def consult_gemini( |
|
self, |
|
query: str, |
|
context: str = "", |
|
comparison_mode: bool = True, |
|
force: bool = False, |
|
) -> Dict[str, Any]: |
|
"""Consult Gemini AI for a second opinion |
|
|
|
Args: |
|
query: The question or code to consult about |
|
context: Additional context |
|
comparison_mode: Compare with previous Claude response |
|
force: Force consultation even if disabled |
|
|
|
Returns: |
|
Dictionary with consultation results |
|
""" |
|
if not query: |
|
return { |
|
"success": False, |
|
"error": "'query' parameter is required for Gemini consultation", |
|
} |
|
|
|
# Consult Gemini |
|
result = await self.gemini.consult_gemini( |
|
query=query, |
|
context=context, |
|
comparison_mode=comparison_mode, |
|
force_consult=force, |
|
) |
|
|
|
# Format the response |
|
formatted_response = self._format_gemini_response(result) |
|
|
|
return { |
|
"success": result.get("status") == "success", |
|
"result": formatted_response, |
|
"raw_result": result, |
|
} |
|
|
|
async def clear_gemini_history(self) -> Dict[str, Any]: |
|
"""Clear Gemini conversation history""" |
|
result = self.gemini.clear_conversation_history() |
|
return {"success": True, "message": result.get("message", "History cleared")} |
|
|
|
async def gemini_status(self) -> Dict[str, Any]: |
|
"""Get Gemini integration status and statistics""" |
|
stats = self.gemini.get_statistics() if hasattr(self.gemini, "get_statistics") else {} |
|
|
|
status_info = { |
|
"enabled": getattr(self.gemini, "enabled", False), |
|
"auto_consult": getattr(self.gemini, "auto_consult", False), |
|
"model": self.gemini_config.get("model", "unknown"), |
|
"timeout": self.gemini_config.get("timeout", 60), |
|
"statistics": stats, |
|
} |
|
|
|
return {"success": True, "status": status_info} |
|
|
|
async def toggle_gemini_auto_consult(self, enable: Optional[bool] = None) -> Dict[str, Any]: |
|
"""Toggle automatic Gemini consultation |
|
|
|
Args: |
|
enable: True to enable, False to disable, None to toggle |
|
|
|
Returns: |
|
Dictionary with new status |
|
""" |
|
if enable is None: |
|
# Toggle current state |
|
self.gemini.auto_consult = not getattr(self.gemini, "auto_consult", False) |
|
else: |
|
self.gemini.auto_consult = bool(enable) |
|
|
|
status = "enabled" if self.gemini.auto_consult else "disabled" |
|
return { |
|
"success": True, |
|
"status": status, |
|
"message": f"Gemini auto-consultation is now {status}", |
|
} |
|
|
|
def _format_gemini_response(self, result: Dict[str, Any]) -> str: |
|
"""Format Gemini consultation response""" |
|
output_lines = [] |
|
output_lines.append("🤖 Gemini Consultation Response") |
|
output_lines.append("=" * 40) |
|
output_lines.append("") |
|
|
|
if result["status"] == "success": |
|
output_lines.append(f"✅ Consultation ID: {result.get('consultation_id', 'N/A')}") |
|
output_lines.append(f"⏱️ Execution time: {result.get('execution_time', 0):.2f}s") |
|
output_lines.append("") |
|
|
|
# Display the raw response |
|
response = result.get("response", "") |
|
if response: |
|
output_lines.append("📄 Response:") |
|
output_lines.append(response) |
|
|
|
elif result["status"] == "disabled": |
|
output_lines.append("ℹ️ Gemini consultation is currently disabled") |
|
output_lines.append("💡 Enable with: toggle_gemini_auto_consult") |
|
|
|
elif result["status"] == "timeout": |
|
output_lines.append(f"❌ {result.get('error', 'Timeout error')}") |
|
output_lines.append("💡 Try increasing the timeout or simplifying the query") |
|
|
|
else: # error |
|
output_lines.append(f"❌ Error: {result.get('error', 'Unknown error')}") |
|
output_lines.append("") |
|
output_lines.append("💡 Troubleshooting:") |
|
output_lines.append(" 1. Check if Gemini CLI is installed and in PATH") |
|
output_lines.append(" 2. Verify Gemini CLI authentication") |
|
output_lines.append(" 3. Check the logs for more details") |
|
|
|
return "\n".join(output_lines) |
|
|
|
async def run_stdio(self): |
|
"""Run the server in stdio mode (for Claude desktop app)""" |
|
server = Server(self.name) |
|
|
|
# Store tools and their functions for later access |
|
self._tools = self.get_tools() |
|
self._tool_funcs = {} |
|
for tool_name, tool_info in self._tools.items(): |
|
tool_func = getattr(self, tool_name, None) |
|
if tool_func: |
|
self._tool_funcs[tool_name] = tool_func |
|
|
|
@server.list_tools() |
|
async def list_tools() -> List[types.Tool]: |
|
"""List available tools""" |
|
tools = [] |
|
for tool_name, tool_info in self._tools.items(): |
|
tools.append( |
|
types.Tool( |
|
name=tool_name, |
|
description=tool_info.get("description", ""), |
|
inputSchema=tool_info.get("parameters", {}), |
|
) |
|
) |
|
return tools |
|
|
|
@server.call_tool() |
|
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: |
|
"""Call a tool with given arguments""" |
|
if name not in self._tool_funcs: |
|
return [types.TextContent(type="text", text=f"Tool '{name}' not found")] |
|
|
|
try: |
|
# Call the tool function |
|
result = await self._tool_funcs[name](**arguments) |
|
|
|
# Convert result to MCP response format |
|
if isinstance(result, dict): |
|
return [types.TextContent(type="text", text=json.dumps(result, indent=2))] |
|
return [types.TextContent(type="text", text=str(result))] |
|
except Exception as e: |
|
self.logger.error(f"Error calling tool {name}: {str(e)}") |
|
return [types.TextContent(type="text", text=f"Error: {str(e)}")] |
|
|
|
# Run the stdio server |
|
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): |
|
await server.run( |
|
read_stream, |
|
write_stream, |
|
InitializationOptions( |
|
server_name=self.name, |
|
server_version=self.version, |
|
capabilities=server.get_capabilities( |
|
notification_options=NotificationOptions(), |
|
experimental_capabilities={}, |
|
), |
|
), |
|
) |
|
|
|
def run_http(self): |
|
"""Run the server in HTTP mode""" |
|
import uvicorn |
|
|
|
uvicorn.run(self.app, host="0.0.0.0", port=self.port) |
|
|
|
def run(self, mode: str = "http"): |
|
"""Run the server in specified mode""" |
|
if mode == "stdio": |
|
asyncio.run(self.run_stdio()) |
|
elif mode == "http": |
|
self.run_http() |
|
else: |
|
raise ValueError(f"Unknown mode: {mode}. Use 'stdio' or 'http'.") |
|
|
|
|
|
def main(): |
|
"""Run the Gemini MCP Server""" |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(description="Gemini AI Integration MCP Server") |
|
parser.add_argument( |
|
"--mode", |
|
choices=["http", "stdio"], |
|
default="stdio", # Default to stdio for Gemini |
|
help="Server mode (http or stdio)", |
|
) |
|
parser.add_argument("--project-root", default=None, help="Project root directory") |
|
args = parser.parse_args() |
|
|
|
server = GeminiMCPServer(project_root=args.project_root) |
|
server.run(mode=args.mode) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
I tested various approaches and outside interactive terminal, Gemini CLI seems to be stateless.
So, does this allow stateful back & forth conversations?