Skip to content

Instantly share code, notes, and snippets.

@IUHHUI
Last active April 9, 2026 09:07
Show Gist options
  • Select an option

  • Save IUHHUI/7e573e0b813c461c31aee9f5adee3ff6 to your computer and use it in GitHub Desktop.

Select an option

Save IUHHUI/7e573e0b813c461c31aee9f5adee3ff6 to your computer and use it in GitHub Desktop.
Test Mcp Server
#!/usr/bin/env python3
"""
SSE (Server-Sent Events) MCP 服务测试脚本
用于验证 SSE 方式的 MCP 服务功能
运行方式:
# 测试默认服务
python test_sse.py
# 测试指定的 SSE 服务
python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/
# 使用 token 认证
python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/ --token "your_token_here"
"""
import argparse
import asyncio
import httpx
import json
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SSETester:
"""SSE MCP 服务测试器"""
def __init__(self, url: str, token: str = None):
"""
初始化测试器
Args:
url: MCP 服务地址,例如 http://127.0.0.1:8070/sse/stock_hk_f10/
token: 可选的认证 token
"""
self.url = url if url.endswith('/') else url + '/'
self.token = token
self.results = {
"mode": "sse",
"url": self.url,
"status": None
}
def _get_headers(self) -> dict:
"""获取请求头"""
headers = {
"Accept": "text/event-stream"
}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
return headers
async def test_service(self) -> bool:
"""测试 SSE 服务"""
headers = self._get_headers()
# 1. 建立初始 SSE 连接获取 session_id
logger.info("=" * 60)
logger.info("测试 SSE 服务")
logger.info(f"URL: {self.url}")
logger.info("=" * 60)
logger.info(f"\n1. 建立 SSE 流式连接")
logger.info(f" URL: {self.url}")
session_id = None
client = None
try:
# 使用持久化客户端保持连接
client = httpx.AsyncClient(timeout=15.0)
async with client.stream("GET", self.url, headers=headers, timeout=5.0) as response:
logger.info(f" 状态码: {response.status_code}")
content_type = response.headers.get("content-type", "")
logger.info(f" Content-Type: {content_type}")
if response.status_code == 200:
logger.info(" ✓ SSE 连接建立成功")
# 读取初始事件获取 session_id
logger.info(f"\n2. 接收 SSE 初始事件")
try:
async for line in response.aiter_lines():
if line.strip().startswith("data:"):
event_data = line.replace("data:", "").strip()
logger.info(f" ✓ 收到事件: {event_data[:80]}...")
# 从 endpoint 事件中提取 session_id
if "session_id=" in event_data:
# 例如: /sse/stock_hk_f10/messages/?session_id=xxx
session_id = event_data.split("session_id=")[-1]
logger.info(f" ✓ 获得 session_id: {session_id[:20]}...")
break
except asyncio.TimeoutError:
logger.info(" ⚠ 读取超时,但连接已建立")
except Exception as e:
logger.info(f" ⚠ 读取流时出错: {e}")
else:
logger.warning(f" ✗ 连接失败,状态码: {response.status_code}")
if client:
await client.aclose()
self.results["status"] = "failed"
return False
# 2. 获取并打印 tool list
if session_id:
logger.info(f"\n3. 获取 tool list (tools/list)")
try:
messages_url = self.url + f"messages/?session_id={session_id}"
payload = {
"jsonrpc": "2.0",
"method": "tools/list",
"id": 1
}
logger.info(f" URL: {messages_url}")
logger.info(f" 请求: {json.dumps(payload)}")
post_headers = headers.copy()
post_headers["Content-Type"] = "application/json"
# SSE 模式下,tool list 响应通过异步返回
# 发送 POST 请求,响应会通过原始 SSE 连接返回
post_response = await client.post(
messages_url,
json=payload,
headers=post_headers,
timeout=5.0
)
logger.info(f" POST 状态码: {post_response.status_code}")
if post_response.status_code in [200, 202, 204]:
logger.info(f" ✓ 请求已提交,返回 {post_response.status_code}")
logger.info(f" ℹ 注: SSE 模式下,响应会通过初始连接的 SSE 流异步返回")
logger.info(f" ℹ 由于这是演示脚本,实际应用需要保持 SSE 连接监听响应")
else:
logger.warning(f" ⚠ POST 返回状态码: {post_response.status_code}")
except Exception as e:
logger.warning(f" ⚠ 获取 tool list 失败: {e}")
if client:
await client.aclose()
logger.info("\n✓ SSE 服务测试完成")
self.results["status"] = "success"
return True
except Exception as e:
logger.error(f"\n✗ SSE 服务测试失败: {e}")
if client:
await client.aclose()
self.results["status"] = "failed"
return False
async def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="SSE MCP 服务测试脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 测试默认服务
python test_sse.py
# 测试指定的 SSE 服务
python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/
# 使用 token 认证
python test_sse.py --url http://127.0.0.1:8070/sse/stock_hk_f10/ --token "your_token"
"""
)
parser.add_argument(
"--url",
type=str,
default="http://127.0.0.1:8070/sse/stock_hk_f10/",
help="MCP 服务地址 (默认: http://127.0.0.1:8070/sse/stock_hk_f10/)"
)
parser.add_argument(
"--token",
type=str,
default=None,
help="可选的认证 token,将作为 'Authorization: Bearer {token}' 头发送"
)
args = parser.parse_args()
logger.info("\n" + "=" * 60)
logger.info("SSE MCP 服务测试")
logger.info("=" * 60 + "\n")
tester = SSETester(url=args.url, token=args.token)
result = await tester.test_service()
# 输出总结
logger.info("\n" + "=" * 60)
logger.info("测试总结")
logger.info("=" * 60)
logger.info(f"模式: {tester.results['mode']}")
logger.info(f"URL: {tester.results['url']}")
logger.info(f"认证: {'是' if args.token else '否'}")
logger.info(f"状态: {tester.results['status']}")
if result:
logger.info("\n✓ 测试通过!服务正常工作")
else:
logger.info("\n✗ 测试失败!请检查服务状态")
logger.info("=" * 60 + "\n")
return 0 if result else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)
#!/usr/bin/env python3
"""
Streamable-HTTP MCP 服务测试脚本
运行方式:
python test_streamable_http.py
python test_streamable_http.py --url http://127.0.0.1:8060/stock_hk_f10/
python test_streamable_http.py --url http://127.0.0.1:8060/stock_hk_f10/ --token "your_token"
"""
import argparse
import asyncio
import httpx
import json
import time
# --- 配置区 ---
DEFAULT_URL = "http://127.0.0.1:8060/stock_hk_f10/"
TIMEOUT = 10.0
# 测试用例列表
test_cases = [
{
"name": "1. 初始化通知 (initialized)",
"payload": {"method": "notifications/initialized", "jsonrpc": "2.0"},
},
{
"name": "2. 获取工具列表 (tools/list)",
"payload": {"method": "tools/list", "jsonrpc": "2.0", "id": 1},
},
{
"name": "3. Ping 测试",
"payload": {"method": "ping", "jsonrpc": "2.0", "id": 2},
},
{
"name": "4. 获取提示词列表 (prompts/list)",
"payload": {"method": "prompts/list", "jsonrpc": "2.0", "id": 3},
},
]
def parse_sse_response(raw: bytes) -> dict | None:
"""从 SSE 响应中解析 JSON 数据"""
for line in raw.decode("utf-8").split("\n"):
if line.startswith("data: "):
try:
return json.loads(line[6:])
except json.JSONDecodeError:
pass
return None
def print_tools(data: dict):
"""打印 tools/list 结果摘要"""
tools = data.get("result", {}).get("tools", [])
if not tools:
return
print(f"获取到 {len(tools)} 个 tools:")
for i, tool in enumerate(tools, 1):
name = tool.get("name", "unknown")
desc = tool.get("description", "")
desc_short = (desc[:50] + "...") if len(desc) > 50 else desc
print(f" [{i}] {name} - {desc_short}")
async def run_tests(url: str, token: str = None):
base_url = url if url.endswith("/") else url + "/"
headers = {"Accept": "application/json, text/event-stream"}
if token:
headers["Authorization"] = f"Bearer {token}"
print(f"开始测试 Streamable-HTTP MCP 服务: {base_url}")
print(f"认证: {'是' if token else '否'}")
print("=" * 60)
total_start = time.time()
results = []
async with httpx.AsyncClient(timeout=TIMEOUT, verify=False) as client:
for case in test_cases:
print(f"\n执行中: {case['name']}")
target_url = base_url + "messages/"
post_headers = {**headers, "Content-Type": "application/json"}
print(f"请求路径: {target_url}")
print(f"请求头: {json.dumps(post_headers, indent=2, ensure_ascii=False)}")
print(f"Payload: {json.dumps(case['payload'], indent=2, ensure_ascii=False)}")
start_time = time.time()
try:
async with client.stream(
"POST", target_url, json=case["payload"], headers=post_headers
) as response:
body = await response.aread()
duration = time.time() - start_time
print(f"状态码: {response.status_code}")
print(f"单次耗时: {duration:.3f} 秒")
status = "PASS" if response.status_code == 200 or response.status_code == 202 else "FAIL"
results.append({"name": case["name"], "status": status, "code": response.status_code, "duration": duration})
if body:
# 尝试解析 SSE 格式
data = parse_sse_response(body)
if data:
print("响应摘要: " + json.dumps(data, indent=2, ensure_ascii=False)[:500] + "...")
print_tools(data)
else:
# 尝试直接解析 JSON
try:
data = json.loads(body)
print("响应摘要: " + json.dumps(data, indent=2, ensure_ascii=False)[:500] + "...")
except (json.JSONDecodeError, UnicodeDecodeError):
print(f"返回结果: {body[:200]}")
except Exception as e:
duration = time.time() - start_time
print(f"请求失败: {e}")
print(f"单次耗时: {duration:.3f} 秒")
results.append({"name": case["name"], "status": "FAIL", "code": "-", "duration": duration})
print("-" * 60)
total_duration = time.time() - total_start
# 汇总所有测试结果
print("\n" + "=" * 60)
print("测试结果汇总")
print("=" * 60)
passed = sum(1 for r in results if r["status"] == "PASS")
failed = len(results) - passed
for r in results:
print(f" [{r['status']}] {r['name']} (状态码: {r['code']}, 耗时: {r['duration']:.3f}s)")
print("-" * 60)
print(f"通过: {passed}, 失败: {failed}, 总计: {len(results)}, 总耗时: {total_duration:.3f} 秒")
print("=" * 60)
async def main():
parser = argparse.ArgumentParser(description="Streamable-HTTP MCP 服务测试脚本")
parser.add_argument("--url", type=str, default=DEFAULT_URL, help=f"MCP 服务地址 (默认: {DEFAULT_URL})")
parser.add_argument("--token", type=str, default=None, help="认证 token")
args = parser.parse_args()
await run_tests(url=args.url, token=args.token)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment