Last active
April 9, 2026 09:07
-
-
Save IUHHUI/7e573e0b813c461c31aee9f5adee3ff6 to your computer and use it in GitHub Desktop.
Test Mcp Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| SSE (Server-Sent Events) MCP 服务测试脚本 | |
| 用于验证 SSE 方式的 MCP 服务功能 | |
| 运行方式: | |
| # 测试默认服务 | |
| python test_sse.py | |
| # 测试指定的 SSE 服务 | |
| python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/ | |
| # 使用 token 认证 | |
| python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/ --token "your_token_here" | |
| """ | |
| import argparse | |
| import asyncio | |
| import httpx | |
| import json | |
| import logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class SSETester: | |
| """SSE MCP 服务测试器""" | |
| def __init__(self, url: str, token: str = None): | |
| """ | |
| 初始化测试器 | |
| Args: | |
| url: MCP 服务地址,例如 http://127.0.0.1:8070/sse/stock_hk_f10/ | |
| token: 可选的认证 token | |
| """ | |
| self.url = url if url.endswith('/') else url + '/' | |
| self.token = token | |
| self.results = { | |
| "mode": "sse", | |
| "url": self.url, | |
| "status": None | |
| } | |
| def _get_headers(self) -> dict: | |
| """获取请求头""" | |
| headers = { | |
| "Accept": "text/event-stream" | |
| } | |
| if self.token: | |
| headers["Authorization"] = f"Bearer {self.token}" | |
| return headers | |
| async def test_service(self) -> bool: | |
| """测试 SSE 服务""" | |
| headers = self._get_headers() | |
| # 1. 建立初始 SSE 连接获取 session_id | |
| logger.info("=" * 60) | |
| logger.info("测试 SSE 服务") | |
| logger.info(f"URL: {self.url}") | |
| logger.info("=" * 60) | |
| logger.info(f"\n1. 建立 SSE 流式连接") | |
| logger.info(f" URL: {self.url}") | |
| session_id = None | |
| client = None | |
| try: | |
| # 使用持久化客户端保持连接 | |
| client = httpx.AsyncClient(timeout=15.0) | |
| async with client.stream("GET", self.url, headers=headers, timeout=5.0) as response: | |
| logger.info(f" 状态码: {response.status_code}") | |
| content_type = response.headers.get("content-type", "") | |
| logger.info(f" Content-Type: {content_type}") | |
| if response.status_code == 200: | |
| logger.info(" ✓ SSE 连接建立成功") | |
| # 读取初始事件获取 session_id | |
| logger.info(f"\n2. 接收 SSE 初始事件") | |
| try: | |
| async for line in response.aiter_lines(): | |
| if line.strip().startswith("data:"): | |
| event_data = line.replace("data:", "").strip() | |
| logger.info(f" ✓ 收到事件: {event_data[:80]}...") | |
| # 从 endpoint 事件中提取 session_id | |
| if "session_id=" in event_data: | |
| # 例如: /sse/stock_hk_f10/messages/?session_id=xxx | |
| session_id = event_data.split("session_id=")[-1] | |
| logger.info(f" ✓ 获得 session_id: {session_id[:20]}...") | |
| break | |
| except asyncio.TimeoutError: | |
| logger.info(" ⚠ 读取超时,但连接已建立") | |
| except Exception as e: | |
| logger.info(f" ⚠ 读取流时出错: {e}") | |
| else: | |
| logger.warning(f" ✗ 连接失败,状态码: {response.status_code}") | |
| if client: | |
| await client.aclose() | |
| self.results["status"] = "failed" | |
| return False | |
| # 2. 获取并打印 tool list | |
| if session_id: | |
| logger.info(f"\n3. 获取 tool list (tools/list)") | |
| try: | |
| messages_url = self.url + f"messages/?session_id={session_id}" | |
| payload = { | |
| "jsonrpc": "2.0", | |
| "method": "tools/list", | |
| "id": 1 | |
| } | |
| logger.info(f" URL: {messages_url}") | |
| logger.info(f" 请求: {json.dumps(payload)}") | |
| post_headers = headers.copy() | |
| post_headers["Content-Type"] = "application/json" | |
| # SSE 模式下,tool list 响应通过异步返回 | |
| # 发送 POST 请求,响应会通过原始 SSE 连接返回 | |
| post_response = await client.post( | |
| messages_url, | |
| json=payload, | |
| headers=post_headers, | |
| timeout=5.0 | |
| ) | |
| logger.info(f" POST 状态码: {post_response.status_code}") | |
| if post_response.status_code in [200, 202, 204]: | |
| logger.info(f" ✓ 请求已提交,返回 {post_response.status_code}") | |
| logger.info(f" ℹ 注: SSE 模式下,响应会通过初始连接的 SSE 流异步返回") | |
| logger.info(f" ℹ 由于这是演示脚本,实际应用需要保持 SSE 连接监听响应") | |
| else: | |
| logger.warning(f" ⚠ POST 返回状态码: {post_response.status_code}") | |
| except Exception as e: | |
| logger.warning(f" ⚠ 获取 tool list 失败: {e}") | |
| if client: | |
| await client.aclose() | |
| logger.info("\n✓ SSE 服务测试完成") | |
| self.results["status"] = "success" | |
| return True | |
| except Exception as e: | |
| logger.error(f"\n✗ SSE 服务测试失败: {e}") | |
| if client: | |
| await client.aclose() | |
| self.results["status"] = "failed" | |
| return False | |
| async def main(): | |
| """主函数""" | |
| parser = argparse.ArgumentParser( | |
| description="SSE MCP 服务测试脚本", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| 示例: | |
| # 测试默认服务 | |
| python test_sse.py | |
| # 测试指定的 SSE 服务 | |
| python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/ | |
| # 使用 token 认证 | |
| python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/ --token "your_token" | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--url", | |
| type=str, | |
| default="http://127.0.0.1:8070/sse/stock_hk_f10/", | |
| help="MCP 服务地址 (默认: http://127.0.0.1:8070/sse/stock_hk_f10/)" | |
| ) | |
| parser.add_argument( | |
| "--token", | |
| type=str, | |
| default=None, | |
| help="可选的认证 token,将作为 'Authorization: Bearer {token}' 头发送" | |
| ) | |
| args = parser.parse_args() | |
| logger.info("\n" + "=" * 60) | |
| logger.info("SSE MCP 服务测试") | |
| logger.info("=" * 60 + "\n") | |
| tester = SSETester(url=args.url, token=args.token) | |
| result = await tester.test_service() | |
| # 输出总结 | |
| logger.info("\n" + "=" * 60) | |
| logger.info("测试总结") | |
| logger.info("=" * 60) | |
| logger.info(f"模式: {tester.results['mode']}") | |
| logger.info(f"URL: {tester.results['url']}") | |
| logger.info(f"认证: {'是' if args.token else '否'}") | |
| logger.info(f"状态: {tester.results['status']}") | |
| if result: | |
| logger.info("\n✓ 测试通过!服务正常工作") | |
| else: | |
| logger.info("\n✗ 测试失败!请检查服务状态") | |
| logger.info("=" * 60 + "\n") | |
| return 0 if result else 1 | |
| if __name__ == "__main__": | |
| exit_code = asyncio.run(main()) | |
| exit(exit_code) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Streamable-HTTP MCP 服务测试脚本 | |
| 运行方式: | |
| python test_streamable_http.py | |
| python test_streamable_http.py --url http://127.0.0.1:8060/stock_hk_f10/ | |
| python test_streamable_http.py --url http://127.0.0.1:8060/stock_hk_f10/ --token "your_token" | |
| """ | |
| import argparse | |
| import asyncio | |
| import httpx | |
| import json | |
| import time | |
| # --- 配置区 --- | |
| DEFAULT_URL = "http://127.0.0.1:8060/stock_hk_f10/" | |
| TIMEOUT = 10.0 | |
| # 测试用例列表 | |
| test_cases = [ | |
| { | |
| "name": "1. 初始化通知 (initialized)", | |
| "payload": {"method": "notifications/initialized", "jsonrpc": "2.0"}, | |
| }, | |
| { | |
| "name": "2. 获取工具列表 (tools/list)", | |
| "payload": {"method": "tools/list", "jsonrpc": "2.0", "id": 1}, | |
| }, | |
| { | |
| "name": "3. Ping 测试", | |
| "payload": {"method": "ping", "jsonrpc": "2.0", "id": 2}, | |
| }, | |
| { | |
| "name": "4. 获取提示词列表 (prompts/list)", | |
| "payload": {"method": "prompts/list", "jsonrpc": "2.0", "id": 3}, | |
| }, | |
| ] | |
| def parse_sse_response(raw: bytes) -> dict | None: | |
| """从 SSE 响应中解析 JSON 数据""" | |
| for line in raw.decode("utf-8").split("\n"): | |
| if line.startswith("data: "): | |
| try: | |
| return json.loads(line[6:]) | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| def print_tools(data: dict): | |
| """打印 tools/list 结果摘要""" | |
| tools = data.get("result", {}).get("tools", []) | |
| if not tools: | |
| return | |
| print(f"获取到 {len(tools)} 个 tools:") | |
| for i, tool in enumerate(tools, 1): | |
| name = tool.get("name", "unknown") | |
| desc = tool.get("description", "") | |
| desc_short = (desc[:50] + "...") if len(desc) > 50 else desc | |
| print(f" [{i}] {name} - {desc_short}") | |
| async def run_tests(url: str, token: str = None): | |
| base_url = url if url.endswith("/") else url + "/" | |
| headers = {"Accept": "application/json, text/event-stream"} | |
| if token: | |
| headers["Authorization"] = f"Bearer {token}" | |
| print(f"开始测试 Streamable-HTTP MCP 服务: {base_url}") | |
| print(f"认证: {'是' if token else '否'}") | |
| print("=" * 60) | |
| total_start = time.time() | |
| results = [] | |
| async with httpx.AsyncClient(timeout=TIMEOUT, verify=False) as client: | |
| for case in test_cases: | |
| print(f"\n执行中: {case['name']}") | |
| target_url = base_url + "messages/" | |
| post_headers = {**headers, "Content-Type": "application/json"} | |
| print(f"请求路径: {target_url}") | |
| print(f"请求头: {json.dumps(post_headers, indent=2, ensure_ascii=False)}") | |
| print(f"Payload: {json.dumps(case['payload'], indent=2, ensure_ascii=False)}") | |
| start_time = time.time() | |
| try: | |
| async with client.stream( | |
| "POST", target_url, json=case["payload"], headers=post_headers | |
| ) as response: | |
| body = await response.aread() | |
| duration = time.time() - start_time | |
| print(f"状态码: {response.status_code}") | |
| print(f"单次耗时: {duration:.3f} 秒") | |
| status = "PASS" if response.status_code == 200 or response.status_code == 202 else "FAIL" | |
| results.append({"name": case["name"], "status": status, "code": response.status_code, "duration": duration}) | |
| if body: | |
| # 尝试解析 SSE 格式 | |
| data = parse_sse_response(body) | |
| if data: | |
| print("响应摘要: " + json.dumps(data, indent=2, ensure_ascii=False)[:500] + "...") | |
| print_tools(data) | |
| else: | |
| # 尝试直接解析 JSON | |
| try: | |
| data = json.loads(body) | |
| print("响应摘要: " + json.dumps(data, indent=2, ensure_ascii=False)[:500] + "...") | |
| except (json.JSONDecodeError, UnicodeDecodeError): | |
| print(f"返回结果: {body[:200]}") | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| print(f"请求失败: {e}") | |
| print(f"单次耗时: {duration:.3f} 秒") | |
| results.append({"name": case["name"], "status": "FAIL", "code": "-", "duration": duration}) | |
| print("-" * 60) | |
| total_duration = time.time() - total_start | |
| # 汇总所有测试结果 | |
| print("\n" + "=" * 60) | |
| print("测试结果汇总") | |
| print("=" * 60) | |
| passed = sum(1 for r in results if r["status"] == "PASS") | |
| failed = len(results) - passed | |
| for r in results: | |
| print(f" [{r['status']}] {r['name']} (状态码: {r['code']}, 耗时: {r['duration']:.3f}s)") | |
| print("-" * 60) | |
| print(f"通过: {passed}, 失败: {failed}, 总计: {len(results)}, 总耗时: {total_duration:.3f} 秒") | |
| print("=" * 60) | |
| async def main(): | |
| parser = argparse.ArgumentParser(description="Streamable-HTTP MCP 服务测试脚本") | |
| parser.add_argument("--url", type=str, default=DEFAULT_URL, help=f"MCP 服务地址 (默认: {DEFAULT_URL})") | |
| parser.add_argument("--token", type=str, default=None, help="认证 token") | |
| args = parser.parse_args() | |
| await run_tests(url=args.url, token=args.token) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment