Skip to content

Instantly share code, notes, and snippets.

@steipete
Last active November 4, 2025 13:32
Show Gist options
  • Save steipete/d6a3f1e3ac274bb8b2337227553496a1 to your computer and use it in GitHub Desktop.
Save steipete/d6a3f1e3ac274bb8b2337227553496a1 to your computer and use it in GitHub Desktop.
Use MCPs via CLI commands. Enable agents to add/remove/load MCPs on demand, progressive disclosure. No more context cluttering.
#!/usr/bin/env python3
"""MCP helper CLI using definitions from config/mcp_servers.json.
This script powers the `pnpm mcp:*` helpers. Typical invocations:
pnpm mcp:list
pnpm mcp:list chrome-devtools --schema
pnpm mcp:call playwright.browser_tabs action=list
pnpm mcp:call chrome-devtools.evaluate_script --args '{"function":"() => document.title"}'
pnpm mcp:call context7.get_library_docs topic=hooks tokens=1500
Argument syntax:
• Selector accepts `mcp.tool`, `mcp:tool`, or `mcp/tool`. Override pieces with `--mcp` / `--tool`.
• Structured payloads can be supplied as `--args '{"key":"value"}'`.
• Additional `key=value` tokens after the selector are merged in, coercing booleans, numbers, null,
or nested JSON automatically. Later keys override earlier ones.
• Use `--tail-log` to automatically tail log files returned by tools such as `get_logs`.
Examples
--------
pnpm mcp:list
pnpm mcp:call next-devtools.get_logs --tail-log
pnpm mcp:call chrome-devtools:list_pages
pnpm mcp:call context7.resolve_library_id libraryName=React
"""
from __future__ import annotations
import argparse
import json
import sys
import warnings
import threading
from pathlib import Path
from textwrap import indent
from typing import Any, Iterable
from mcp2py import load
# pnpm forwards an extra `--` when invoking scripts; strip those early.
if "--" in sys.argv:
sys.argv = [arg for arg in sys.argv if arg != "--"]
warnings.filterwarnings("ignore", message="coroutine method 'aclose'")
warnings.filterwarnings(
"ignore",
message="coroutine method 'aclose' of 'HTTP11ConnectionByteStream.__aiter__' was never awaited",
)
CONFIG_PATH = Path(__file__).resolve().parent.parent / "config" / "mcp_servers.json"
LIST_TIMEOUT_SECONDS = 60.0
def is_stdio_command(command: str | list[str]) -> bool:
"""Return True when the command already routes through our stdio wrapper."""
if isinstance(command, list):
return command[:2] == ["bash", "scripts/mcp_stdio_wrapper.sh"]
return command.startswith("bash scripts/mcp_stdio_wrapper.sh")
def wrap_stdio_command(command: str | list[str]) -> list[str]:
"""Ensure the command is executed via scripts/mcp_stdio_wrapper.sh."""
if isinstance(command, list):
return command
return ["bash", "scripts/mcp_stdio_wrapper.sh", command]
def fetch_with_timeout(func, timeout: float, *args, **kwargs):
"""Execute func(*args, **kwargs) with a timeout, returning (result, timed_out)."""
result_container = {}
exception_container = {}
def target():
try:
result_container['value'] = func(*args, **kwargs)
except Exception as exc: # noqa: BLE001
exception_container['value'] = exc
thread = threading.Thread(target=target, daemon=True)
thread.start()
thread.join(timeout)
if thread.is_alive():
return None, True
if 'value' in exception_container:
raise exception_container['value']
return result_container.get('value'), False
def command_to_string(command: str | list[str]) -> str:
"""Normalize list commands to a shell-friendly string."""
if isinstance(command, str):
return command
return " ".join(command)
def load_servers() -> list[dict[str, Any]]:
"""Load and validate MCP server definitions from config/mcp_servers.json."""
if not CONFIG_PATH.exists():
return []
try:
data = json.loads(CONFIG_PATH.read_text())
except (OSError, json.JSONDecodeError) as exc:
print(f"Failed to read {CONFIG_PATH}: {exc}", file=sys.stderr)
sys.exit(1)
servers: list[dict[str, Any]] = []
for item in data:
if isinstance(item, dict) and item.get("name") and item.get("command"):
servers.append(item)
return servers
def get_load_kwargs(command: str, entry: dict[str, Any] | None) -> dict[str, Any]:
"""Build keyword arguments for mcp2py.load() based on the server entry."""
if not entry or not isinstance(entry, dict):
return {}
kwargs: dict[str, Any] = {}
headers = entry.get("headers")
if headers:
kwargs["headers"] = headers
auth_option = entry.get("auth")
if auth_option == "oauth":
token_dir = entry.get("token_cache_dir")
if token_dir:
from fastmcp.client.auth.oauth import OAuth as FastMCPOAuth
from key_value.aio.stores.disk import DiskStore
import httpx
cache_path = Path(token_dir).expanduser()
cache_path.mkdir(parents=True, exist_ok=True)
disk_store = DiskStore(directory=cache_path)
command_str = command_to_string(command)
class PersistentOAuth(httpx.Auth):
def __init__(self) -> None:
self._oauth = FastMCPOAuth(
mcp_url=command_str,
client_name=entry.get("client_name", "mcp2py"),
token_storage=disk_store,
)
def auth_flow(self, request: httpx.Request):
return self._oauth.auth_flow(request)
def async_auth_flow(self, request: httpx.Request):
return self._oauth.async_auth_flow(request)
kwargs["auth"] = PersistentOAuth()
else:
kwargs["auth"] = "oauth"
elif auth_option:
kwargs["auth"] = auth_option
return kwargs
def resolve_target(
*,
servers: list[dict[str, Any]],
mcp_name: str | None,
override_command: str | None,
) -> tuple[str, dict[str, Any] | None]:
"""Return the command + entry for the requested MCP name or explicit override."""
if override_command:
return override_command, None
if mcp_name:
for entry in servers:
if entry.get("name") == mcp_name:
return entry["command"], entry
print(f"Unknown MCP '{mcp_name}'. Run 'pnpm mcp:list' to see available names.", file=sys.stderr)
sys.exit(1)
print("MCP name missing. Use '<name>.<tool>' or supply --mcp/--server.", file=sys.stderr)
sys.exit(1)
def maybe_parse_json(value: str) -> Any:
"""Attempt to parse a JSON literal, falling back to the raw string."""
try:
return json.loads(value)
except json.JSONDecodeError:
return value
def coerce_call_value(raw: str) -> Any:
"""Convert stringy key=value tokens into loosely typed values."""
value = raw.strip()
if value == "":
return ""
lowered = value.lower()
if lowered in {"true", "false"}:
return lowered == "true"
if lowered in {"null", "none"}:
return None
try:
if value.startswith("0") and value != "0" and not value.startswith("0."):
raise ValueError
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
if value.startswith("{") or value.startswith("["):
decoded = maybe_parse_json(value)
if isinstance(decoded, (dict, list)):
return decoded
return value
def parse_call_style_args(tokens: Iterable[str]) -> dict[str, Any]:
"""Parse key=value arguments passed after the selector."""
results: dict[str, Any] = {}
for token in tokens:
if "=" not in token:
raise ValueError(f"Argument '{token}' must be in key=value format.")
key, raw = token.split("=", 1)
key = key.strip()
if not key:
raise ValueError("Argument names cannot be empty.")
results[key] = coerce_call_value(raw)
return results
def infer_type(prop: dict[str, Any]) -> str:
"""Convert JSON-schema property definitions into a concise TypeScript-like type."""
t = prop.get("type")
if isinstance(t, list):
non_null = [entry for entry in t if entry != "null"]
if non_null:
t = non_null[0]
else:
t = t[0] if t else None
if t == "string":
if "enum" in prop:
values = prop["enum"]
if len(values) <= 5:
joined = " | ".join(f'"{v}"' for v in values)
return joined
return "string"
if t in {"integer", "number"}:
return "number"
if t == "boolean":
return "boolean"
if t == "array":
inner = "unknown"
items = prop.get("items")
if isinstance(items, dict):
inner = infer_type(items)
return f"{inner}[]"
if t == "object":
return "object"
if "anyOf" in prop:
options = [infer_type(option) for option in prop["anyOf"]]
return " | ".join(options)
if "oneOf" in prop:
options = [infer_type(option) for option in prop["oneOf"]]
return " | ".join(options)
return "unknown"
def schema_to_params(schema: dict[str, Any] | None) -> list[tuple[str, str, bool]]:
"""Extract (name, type, optional) tuples from a JSON-schema object."""
if not schema or not isinstance(schema, dict):
return []
if schema.get("type") != "object":
return []
properties = schema.get("properties", {})
required = set(schema.get("required") or [])
params: list[tuple[str, str, bool]] = []
for name, prop in properties.items():
type_str = infer_type(prop if isinstance(prop, dict) else {})
optional = name not in required
params.append((name, type_str, optional))
return params
def format_signature(name: str, schema: dict[str, Any] | None) -> str:
"""Render a function signature string from tool metadata."""
params = schema_to_params(schema)
if not params:
return f"{name}()"
parts = []
for param_name, type_str, optional in params:
suffix = "?" if optional else ""
parts.append(f"{param_name}{suffix}: {type_str}")
joined = ", ".join(parts)
return f"{name}({joined})"
def tail_log_if_requested(text: str, tail: bool) -> None:
"""Tail the last lines of a logfile when the tool response includes a colon path."""
if not tail or ":" not in text:
return
path_candidate = text.split(":", 1)[1].strip()
log_path = Path(path_candidate)
if not log_path.is_file():
print(f"[warn] Log path not found: {log_path}", file=sys.stderr)
return
print(f"\n# Tail of {log_path}")
for line in log_path.read_text().splitlines()[-20:]:
print(line)
def fetch_server_info(command: str, entry: dict[str, Any], include_schema: bool):
"""Collect available tools + metadata from an MCP server."""
load_kwargs = get_load_kwargs(command, entry)
warnings: list[str] = []
entries: list[dict[str, Any]] = []
try:
with load(command, **load_kwargs) as server:
tool_map = getattr(server, "_tools", {})
name_map = getattr(server, "_name_map", {})
if isinstance(tool_map, dict):
for original_name, tool_schema in tool_map.items():
# Rehydrate the public alias; fall back to the canonical name when no alias exists.
public_name = next((alias for alias, orig in name_map.items() if orig == original_name), original_name)
schema = tool_schema.get("inputSchema") if isinstance(tool_schema, dict) else None
description = tool_schema.get("description", "") if isinstance(tool_schema, dict) else ""
try:
signature = format_signature(public_name, schema)
except ValueError as signature_error:
signature = f"{public_name}(…)"
warnings.append(f"[warn] Could not build signature: {signature_error}")
entries.append({
"signature": signature,
"doc": description.splitlines()[0],
"schema": schema if include_schema else None,
})
return {"entries": entries, "warnings": warnings, "error": None}
except Exception as exc: # noqa: BLE001
return {"entries": None, "warnings": warnings, "error": exc}
def command_list(args: argparse.Namespace, servers: list[dict[str, Any]]) -> int:
"""Handle the `pnpm mcp:list` command."""
targets: Iterable[dict[str, Any]]
if args.mcp:
targets = [entry for entry in servers if entry.get("name") == args.mcp]
if not targets:
print(f"Unknown MCP '{args.mcp}'.", file=sys.stderr)
return 1
else:
targets = servers
targets = list(targets)
if not targets:
print("No MCP servers configured. Add entries to config/mcp_servers.json.")
return 0
for entry in targets:
command = entry.get("command")
if not command:
continue
if is_stdio_command(command):
# Wrap stdio servers so they inherit repo-relative execution and optional silencing.
command = wrap_stdio_command(command)
command_str = command_to_string(command)
name = entry.get("name", "<unknown>")
description = entry.get("description", "")
line = f"- {name}"
if description:
line += f" — {description}"
print(line)
result, timed_out = fetch_with_timeout(fetch_server_info, LIST_TIMEOUT_SECONDS, command, entry, args.schema)
if timed_out:
print(f" Tools: <timed out after {LIST_TIMEOUT_SECONDS:.0f}s>")
continue
if result["error"] is not None:
error = result["error"]
if entry.get("auth") == "oauth":
print(f" Tools: <authentication required – follow the OAuth URL above and rerun> ({error})")
else:
print(f" Tools: <failed to load> ({error})")
continue
entries = result["entries"] or []
warnings = result["warnings"]
if not entries:
print(" Tools: <none>")
continue
print(" Tools:")
for warning in warnings:
print(f" {warning}")
for entry_info in entries:
line = f" - {entry_info['signature']}"
if entry_info["doc"]:
line += f": {entry_info['doc']}"
print(line)
if args.schema and entry_info["schema"] is not None:
schema_text = json.dumps(entry_info["schema"], indent=2)
print(indent(schema_text, " "))
return 0
def command_call(args: argparse.Namespace, servers: list[dict[str, Any]]) -> int:
"""Handle the `pnpm mcp:call` command."""
selector = args.selector
mcp_name = args.mcp
tool_name = args.tool
if selector:
normalized = selector.replace('/', '.').replace(':', '.')
parts = normalized.split('.', 1)
if len(parts) == 2:
if not mcp_name:
mcp_name = parts[0]
if not tool_name:
tool_name = parts[1]
else:
value = parts[0]
if mcp_name and not tool_name:
tool_name = value
elif tool_name and not mcp_name:
mcp_name = value
elif args.server:
tool_name = value
else:
tool_name = value
if not mcp_name and not args.server:
print("MCP name missing. Use '<name>.<tool>' or supply --mcp/--server.", file=sys.stderr)
return 1
if not tool_name:
print('Tool name missing. Provide one via selector or --tool.', file=sys.stderr)
return 1
command, entry = resolve_target(
servers=servers,
mcp_name=mcp_name,
override_command=args.server,
)
load_kwargs = get_load_kwargs(command, entry)
with load(command, **load_kwargs) as server:
try:
tool = getattr(server, tool_name)
except AttributeError:
available = ", ".join(t.__name__ for t in server.tools)
print(f"Tool '{tool_name}' not found. Available: {available}", file=sys.stderr)
return 1
kwargs: dict[str, Any] = {}
if args.args:
parsed = maybe_parse_json(args.args)
if not isinstance(parsed, dict):
print("--args must decode to a JSON object", file=sys.stderr)
return 1
kwargs = dict(parsed)
if args.call_args:
try:
call_kwargs = parse_call_style_args(args.call_args)
except ValueError as exc:
print(f"Invalid call argument: {exc}", file=sys.stderr)
return 1
kwargs.update(call_kwargs)
result = tool(**kwargs)
if isinstance(result, str):
parsed = maybe_parse_json(result)
if isinstance(parsed, (dict, list)):
print(json.dumps(parsed, indent=2))
else:
print(result)
tail_log_if_requested(result, args.tail_log)
else:
print(json.dumps(result, indent=2, default=str))
return 0
def build_parser() -> argparse.ArgumentParser:
"""Construct the CLI parser with subcommands."""
parser = argparse.ArgumentParser(description="Interact with MCP servers via mcp2py")
sub = parser.add_subparsers(dest="command", required=True)
list_parser = sub.add_parser("list", help="List configured MCP servers")
list_parser.add_argument("mcp", nargs="?", help="Name of a specific MCP to display")
list_parser.add_argument("--schema", action="store_true", help="Print raw JSON schema for each tool")
list_parser.set_defaults(func=command_list)
call_parser = sub.add_parser("call", help="Invoke a tool on an MCP server")
call_parser.add_argument(
"selector",
nargs="?",
help="Optional shorthand '<mcp>.<tool>' (dot/colon/slash) or tool name",
)
call_parser.add_argument("--mcp", help="MCP name from config/mcp_servers.json")
call_parser.add_argument("--tool", help="Tool name when selector omits it")
call_parser.add_argument("--server", help="Override MCP command/URL")
call_parser.add_argument("--args", help="JSON object with tool arguments")
call_parser.add_argument(
"--tail-log",
action="store_true",
help="Tail log files returned by tools such as get_logs",
)
call_parser.add_argument(
"call_args",
nargs="*",
help="Optional key=value arguments (function-style) appended after the selector",
)
call_parser.set_defaults(func=command_call)
return parser
def main() -> int:
"""Entry point for the CLI."""
parser = build_parser()
args = parser.parse_args()
servers = load_servers()
return args.func(args, servers)
if __name__ == "__main__":
sys.exit(main())
#!/usr/bin/env bash
# Generic wrapper for stdio MCP servers.
# Keeps repo-relative resolution for stdio commands while allowing optional output suppression.
set -euo pipefail
cd "$(dirname "$0")/.."
if [[ "${MCP_STDIO_SILENT:-0}" == "1" ]]; then
exec "$@" >/dev/null 2>&1
else
exec "$@"
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment