Last active
          November 4, 2025 13:32 
        
      - 
      
 - 
        
Save steipete/d6a3f1e3ac274bb8b2337227553496a1 to your computer and use it in GitHub Desktop.  
    Use MCPs via CLI commands. Enable agents to add/remove/load MCPs on demand, progressive disclosure. No more context cluttering.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python3 | |
| """MCP helper CLI using definitions from config/mcp_servers.json. | |
| This script powers the `pnpm mcp:*` helpers. Typical invocations: | |
| pnpm mcp:list | |
| pnpm mcp:list chrome-devtools --schema | |
| pnpm mcp:call playwright.browser_tabs action=list | |
| pnpm mcp:call chrome-devtools.evaluate_script --args '{"function":"() => document.title"}' | |
| pnpm mcp:call context7.get_library_docs topic=hooks tokens=1500 | |
| Argument syntax: | |
| • Selector accepts `mcp.tool`, `mcp:tool`, or `mcp/tool`. Override pieces with `--mcp` / `--tool`. | |
| • Structured payloads can be supplied as `--args '{"key":"value"}'`. | |
| • Additional `key=value` tokens after the selector are merged in, coercing booleans, numbers, null, | |
| or nested JSON automatically. Later keys override earlier ones. | |
| • Use `--tail-log` to automatically tail log files returned by tools such as `get_logs`. | |
| Examples | |
| -------- | |
| pnpm mcp:list | |
| pnpm mcp:call next-devtools.get_logs --tail-log | |
| pnpm mcp:call chrome-devtools:list_pages | |
| pnpm mcp:call context7.resolve_library_id libraryName=React | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| import warnings | |
| import threading | |
| from pathlib import Path | |
| from textwrap import indent | |
| from typing import Any, Iterable | |
| from mcp2py import load | |
| # pnpm forwards an extra `--` when invoking scripts; strip those early. | |
| if "--" in sys.argv: | |
| sys.argv = [arg for arg in sys.argv if arg != "--"] | |
| warnings.filterwarnings("ignore", message="coroutine method 'aclose'") | |
| warnings.filterwarnings( | |
| "ignore", | |
| message="coroutine method 'aclose' of 'HTTP11ConnectionByteStream.__aiter__' was never awaited", | |
| ) | |
| CONFIG_PATH = Path(__file__).resolve().parent.parent / "config" / "mcp_servers.json" | |
| LIST_TIMEOUT_SECONDS = 60.0 | |
| def is_stdio_command(command: str | list[str]) -> bool: | |
| """Return True when the command already routes through our stdio wrapper.""" | |
| if isinstance(command, list): | |
| return command[:2] == ["bash", "scripts/mcp_stdio_wrapper.sh"] | |
| return command.startswith("bash scripts/mcp_stdio_wrapper.sh") | |
| def wrap_stdio_command(command: str | list[str]) -> list[str]: | |
| """Ensure the command is executed via scripts/mcp_stdio_wrapper.sh.""" | |
| if isinstance(command, list): | |
| return command | |
| return ["bash", "scripts/mcp_stdio_wrapper.sh", command] | |
| def fetch_with_timeout(func, timeout: float, *args, **kwargs): | |
| """Execute func(*args, **kwargs) with a timeout, returning (result, timed_out).""" | |
| result_container = {} | |
| exception_container = {} | |
| def target(): | |
| try: | |
| result_container['value'] = func(*args, **kwargs) | |
| except Exception as exc: # noqa: BLE001 | |
| exception_container['value'] = exc | |
| thread = threading.Thread(target=target, daemon=True) | |
| thread.start() | |
| thread.join(timeout) | |
| if thread.is_alive(): | |
| return None, True | |
| if 'value' in exception_container: | |
| raise exception_container['value'] | |
| return result_container.get('value'), False | |
| def command_to_string(command: str | list[str]) -> str: | |
| """Normalize list commands to a shell-friendly string.""" | |
| if isinstance(command, str): | |
| return command | |
| return " ".join(command) | |
| def load_servers() -> list[dict[str, Any]]: | |
| """Load and validate MCP server definitions from config/mcp_servers.json.""" | |
| if not CONFIG_PATH.exists(): | |
| return [] | |
| try: | |
| data = json.loads(CONFIG_PATH.read_text()) | |
| except (OSError, json.JSONDecodeError) as exc: | |
| print(f"Failed to read {CONFIG_PATH}: {exc}", file=sys.stderr) | |
| sys.exit(1) | |
| servers: list[dict[str, Any]] = [] | |
| for item in data: | |
| if isinstance(item, dict) and item.get("name") and item.get("command"): | |
| servers.append(item) | |
| return servers | |
| def get_load_kwargs(command: str, entry: dict[str, Any] | None) -> dict[str, Any]: | |
| """Build keyword arguments for mcp2py.load() based on the server entry.""" | |
| if not entry or not isinstance(entry, dict): | |
| return {} | |
| kwargs: dict[str, Any] = {} | |
| headers = entry.get("headers") | |
| if headers: | |
| kwargs["headers"] = headers | |
| auth_option = entry.get("auth") | |
| if auth_option == "oauth": | |
| token_dir = entry.get("token_cache_dir") | |
| if token_dir: | |
| from fastmcp.client.auth.oauth import OAuth as FastMCPOAuth | |
| from key_value.aio.stores.disk import DiskStore | |
| import httpx | |
| cache_path = Path(token_dir).expanduser() | |
| cache_path.mkdir(parents=True, exist_ok=True) | |
| disk_store = DiskStore(directory=cache_path) | |
| command_str = command_to_string(command) | |
| class PersistentOAuth(httpx.Auth): | |
| def __init__(self) -> None: | |
| self._oauth = FastMCPOAuth( | |
| mcp_url=command_str, | |
| client_name=entry.get("client_name", "mcp2py"), | |
| token_storage=disk_store, | |
| ) | |
| def auth_flow(self, request: httpx.Request): | |
| return self._oauth.auth_flow(request) | |
| def async_auth_flow(self, request: httpx.Request): | |
| return self._oauth.async_auth_flow(request) | |
| kwargs["auth"] = PersistentOAuth() | |
| else: | |
| kwargs["auth"] = "oauth" | |
| elif auth_option: | |
| kwargs["auth"] = auth_option | |
| return kwargs | |
| def resolve_target( | |
| *, | |
| servers: list[dict[str, Any]], | |
| mcp_name: str | None, | |
| override_command: str | None, | |
| ) -> tuple[str, dict[str, Any] | None]: | |
| """Return the command + entry for the requested MCP name or explicit override.""" | |
| if override_command: | |
| return override_command, None | |
| if mcp_name: | |
| for entry in servers: | |
| if entry.get("name") == mcp_name: | |
| return entry["command"], entry | |
| print(f"Unknown MCP '{mcp_name}'. Run 'pnpm mcp:list' to see available names.", file=sys.stderr) | |
| sys.exit(1) | |
| print("MCP name missing. Use '<name>.<tool>' or supply --mcp/--server.", file=sys.stderr) | |
| sys.exit(1) | |
| def maybe_parse_json(value: str) -> Any: | |
| """Attempt to parse a JSON literal, falling back to the raw string.""" | |
| try: | |
| return json.loads(value) | |
| except json.JSONDecodeError: | |
| return value | |
| def coerce_call_value(raw: str) -> Any: | |
| """Convert stringy key=value tokens into loosely typed values.""" | |
| value = raw.strip() | |
| if value == "": | |
| return "" | |
| lowered = value.lower() | |
| if lowered in {"true", "false"}: | |
| return lowered == "true" | |
| if lowered in {"null", "none"}: | |
| return None | |
| try: | |
| if value.startswith("0") and value != "0" and not value.startswith("0."): | |
| raise ValueError | |
| return int(value) | |
| except ValueError: | |
| pass | |
| try: | |
| return float(value) | |
| except ValueError: | |
| pass | |
| if value.startswith("{") or value.startswith("["): | |
| decoded = maybe_parse_json(value) | |
| if isinstance(decoded, (dict, list)): | |
| return decoded | |
| return value | |
| def parse_call_style_args(tokens: Iterable[str]) -> dict[str, Any]: | |
| """Parse key=value arguments passed after the selector.""" | |
| results: dict[str, Any] = {} | |
| for token in tokens: | |
| if "=" not in token: | |
| raise ValueError(f"Argument '{token}' must be in key=value format.") | |
| key, raw = token.split("=", 1) | |
| key = key.strip() | |
| if not key: | |
| raise ValueError("Argument names cannot be empty.") | |
| results[key] = coerce_call_value(raw) | |
| return results | |
| def infer_type(prop: dict[str, Any]) -> str: | |
| """Convert JSON-schema property definitions into a concise TypeScript-like type.""" | |
| t = prop.get("type") | |
| if isinstance(t, list): | |
| non_null = [entry for entry in t if entry != "null"] | |
| if non_null: | |
| t = non_null[0] | |
| else: | |
| t = t[0] if t else None | |
| if t == "string": | |
| if "enum" in prop: | |
| values = prop["enum"] | |
| if len(values) <= 5: | |
| joined = " | ".join(f'"{v}"' for v in values) | |
| return joined | |
| return "string" | |
| if t in {"integer", "number"}: | |
| return "number" | |
| if t == "boolean": | |
| return "boolean" | |
| if t == "array": | |
| inner = "unknown" | |
| items = prop.get("items") | |
| if isinstance(items, dict): | |
| inner = infer_type(items) | |
| return f"{inner}[]" | |
| if t == "object": | |
| return "object" | |
| if "anyOf" in prop: | |
| options = [infer_type(option) for option in prop["anyOf"]] | |
| return " | ".join(options) | |
| if "oneOf" in prop: | |
| options = [infer_type(option) for option in prop["oneOf"]] | |
| return " | ".join(options) | |
| return "unknown" | |
| def schema_to_params(schema: dict[str, Any] | None) -> list[tuple[str, str, bool]]: | |
| """Extract (name, type, optional) tuples from a JSON-schema object.""" | |
| if not schema or not isinstance(schema, dict): | |
| return [] | |
| if schema.get("type") != "object": | |
| return [] | |
| properties = schema.get("properties", {}) | |
| required = set(schema.get("required") or []) | |
| params: list[tuple[str, str, bool]] = [] | |
| for name, prop in properties.items(): | |
| type_str = infer_type(prop if isinstance(prop, dict) else {}) | |
| optional = name not in required | |
| params.append((name, type_str, optional)) | |
| return params | |
| def format_signature(name: str, schema: dict[str, Any] | None) -> str: | |
| """Render a function signature string from tool metadata.""" | |
| params = schema_to_params(schema) | |
| if not params: | |
| return f"{name}()" | |
| parts = [] | |
| for param_name, type_str, optional in params: | |
| suffix = "?" if optional else "" | |
| parts.append(f"{param_name}{suffix}: {type_str}") | |
| joined = ", ".join(parts) | |
| return f"{name}({joined})" | |
| def tail_log_if_requested(text: str, tail: bool) -> None: | |
| """Tail the last lines of a logfile when the tool response includes a colon path.""" | |
| if not tail or ":" not in text: | |
| return | |
| path_candidate = text.split(":", 1)[1].strip() | |
| log_path = Path(path_candidate) | |
| if not log_path.is_file(): | |
| print(f"[warn] Log path not found: {log_path}", file=sys.stderr) | |
| return | |
| print(f"\n# Tail of {log_path}") | |
| for line in log_path.read_text().splitlines()[-20:]: | |
| print(line) | |
| def fetch_server_info(command: str, entry: dict[str, Any], include_schema: bool): | |
| """Collect available tools + metadata from an MCP server.""" | |
| load_kwargs = get_load_kwargs(command, entry) | |
| warnings: list[str] = [] | |
| entries: list[dict[str, Any]] = [] | |
| try: | |
| with load(command, **load_kwargs) as server: | |
| tool_map = getattr(server, "_tools", {}) | |
| name_map = getattr(server, "_name_map", {}) | |
| if isinstance(tool_map, dict): | |
| for original_name, tool_schema in tool_map.items(): | |
| # Rehydrate the public alias; fall back to the canonical name when no alias exists. | |
| public_name = next((alias for alias, orig in name_map.items() if orig == original_name), original_name) | |
| schema = tool_schema.get("inputSchema") if isinstance(tool_schema, dict) else None | |
| description = tool_schema.get("description", "") if isinstance(tool_schema, dict) else "" | |
| try: | |
| signature = format_signature(public_name, schema) | |
| except ValueError as signature_error: | |
| signature = f"{public_name}(…)" | |
| warnings.append(f"[warn] Could not build signature: {signature_error}") | |
| entries.append({ | |
| "signature": signature, | |
| "doc": description.splitlines()[0], | |
| "schema": schema if include_schema else None, | |
| }) | |
| return {"entries": entries, "warnings": warnings, "error": None} | |
| except Exception as exc: # noqa: BLE001 | |
| return {"entries": None, "warnings": warnings, "error": exc} | |
| def command_list(args: argparse.Namespace, servers: list[dict[str, Any]]) -> int: | |
| """Handle the `pnpm mcp:list` command.""" | |
| targets: Iterable[dict[str, Any]] | |
| if args.mcp: | |
| targets = [entry for entry in servers if entry.get("name") == args.mcp] | |
| if not targets: | |
| print(f"Unknown MCP '{args.mcp}'.", file=sys.stderr) | |
| return 1 | |
| else: | |
| targets = servers | |
| targets = list(targets) | |
| if not targets: | |
| print("No MCP servers configured. Add entries to config/mcp_servers.json.") | |
| return 0 | |
| for entry in targets: | |
| command = entry.get("command") | |
| if not command: | |
| continue | |
| if is_stdio_command(command): | |
| # Wrap stdio servers so they inherit repo-relative execution and optional silencing. | |
| command = wrap_stdio_command(command) | |
| command_str = command_to_string(command) | |
| name = entry.get("name", "<unknown>") | |
| description = entry.get("description", "") | |
| line = f"- {name}" | |
| if description: | |
| line += f" — {description}" | |
| print(line) | |
| result, timed_out = fetch_with_timeout(fetch_server_info, LIST_TIMEOUT_SECONDS, command, entry, args.schema) | |
| if timed_out: | |
| print(f" Tools: <timed out after {LIST_TIMEOUT_SECONDS:.0f}s>") | |
| continue | |
| if result["error"] is not None: | |
| error = result["error"] | |
| if entry.get("auth") == "oauth": | |
| print(f" Tools: <authentication required – follow the OAuth URL above and rerun> ({error})") | |
| else: | |
| print(f" Tools: <failed to load> ({error})") | |
| continue | |
| entries = result["entries"] or [] | |
| warnings = result["warnings"] | |
| if not entries: | |
| print(" Tools: <none>") | |
| continue | |
| print(" Tools:") | |
| for warning in warnings: | |
| print(f" {warning}") | |
| for entry_info in entries: | |
| line = f" - {entry_info['signature']}" | |
| if entry_info["doc"]: | |
| line += f": {entry_info['doc']}" | |
| print(line) | |
| if args.schema and entry_info["schema"] is not None: | |
| schema_text = json.dumps(entry_info["schema"], indent=2) | |
| print(indent(schema_text, " ")) | |
| return 0 | |
| def command_call(args: argparse.Namespace, servers: list[dict[str, Any]]) -> int: | |
| """Handle the `pnpm mcp:call` command.""" | |
| selector = args.selector | |
| mcp_name = args.mcp | |
| tool_name = args.tool | |
| if selector: | |
| normalized = selector.replace('/', '.').replace(':', '.') | |
| parts = normalized.split('.', 1) | |
| if len(parts) == 2: | |
| if not mcp_name: | |
| mcp_name = parts[0] | |
| if not tool_name: | |
| tool_name = parts[1] | |
| else: | |
| value = parts[0] | |
| if mcp_name and not tool_name: | |
| tool_name = value | |
| elif tool_name and not mcp_name: | |
| mcp_name = value | |
| elif args.server: | |
| tool_name = value | |
| else: | |
| tool_name = value | |
| if not mcp_name and not args.server: | |
| print("MCP name missing. Use '<name>.<tool>' or supply --mcp/--server.", file=sys.stderr) | |
| return 1 | |
| if not tool_name: | |
| print('Tool name missing. Provide one via selector or --tool.', file=sys.stderr) | |
| return 1 | |
| command, entry = resolve_target( | |
| servers=servers, | |
| mcp_name=mcp_name, | |
| override_command=args.server, | |
| ) | |
| load_kwargs = get_load_kwargs(command, entry) | |
| with load(command, **load_kwargs) as server: | |
| try: | |
| tool = getattr(server, tool_name) | |
| except AttributeError: | |
| available = ", ".join(t.__name__ for t in server.tools) | |
| print(f"Tool '{tool_name}' not found. Available: {available}", file=sys.stderr) | |
| return 1 | |
| kwargs: dict[str, Any] = {} | |
| if args.args: | |
| parsed = maybe_parse_json(args.args) | |
| if not isinstance(parsed, dict): | |
| print("--args must decode to a JSON object", file=sys.stderr) | |
| return 1 | |
| kwargs = dict(parsed) | |
| if args.call_args: | |
| try: | |
| call_kwargs = parse_call_style_args(args.call_args) | |
| except ValueError as exc: | |
| print(f"Invalid call argument: {exc}", file=sys.stderr) | |
| return 1 | |
| kwargs.update(call_kwargs) | |
| result = tool(**kwargs) | |
| if isinstance(result, str): | |
| parsed = maybe_parse_json(result) | |
| if isinstance(parsed, (dict, list)): | |
| print(json.dumps(parsed, indent=2)) | |
| else: | |
| print(result) | |
| tail_log_if_requested(result, args.tail_log) | |
| else: | |
| print(json.dumps(result, indent=2, default=str)) | |
| return 0 | |
| def build_parser() -> argparse.ArgumentParser: | |
| """Construct the CLI parser with subcommands.""" | |
| parser = argparse.ArgumentParser(description="Interact with MCP servers via mcp2py") | |
| sub = parser.add_subparsers(dest="command", required=True) | |
| list_parser = sub.add_parser("list", help="List configured MCP servers") | |
| list_parser.add_argument("mcp", nargs="?", help="Name of a specific MCP to display") | |
| list_parser.add_argument("--schema", action="store_true", help="Print raw JSON schema for each tool") | |
| list_parser.set_defaults(func=command_list) | |
| call_parser = sub.add_parser("call", help="Invoke a tool on an MCP server") | |
| call_parser.add_argument( | |
| "selector", | |
| nargs="?", | |
| help="Optional shorthand '<mcp>.<tool>' (dot/colon/slash) or tool name", | |
| ) | |
| call_parser.add_argument("--mcp", help="MCP name from config/mcp_servers.json") | |
| call_parser.add_argument("--tool", help="Tool name when selector omits it") | |
| call_parser.add_argument("--server", help="Override MCP command/URL") | |
| call_parser.add_argument("--args", help="JSON object with tool arguments") | |
| call_parser.add_argument( | |
| "--tail-log", | |
| action="store_true", | |
| help="Tail log files returned by tools such as get_logs", | |
| ) | |
| call_parser.add_argument( | |
| "call_args", | |
| nargs="*", | |
| help="Optional key=value arguments (function-style) appended after the selector", | |
| ) | |
| call_parser.set_defaults(func=command_call) | |
| return parser | |
| def main() -> int: | |
| """Entry point for the CLI.""" | |
| parser = build_parser() | |
| args = parser.parse_args() | |
| servers = load_servers() | |
| return args.func(args, servers) | |
| if __name__ == "__main__": | |
| sys.exit(main()) | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env bash | |
| # Generic wrapper for stdio MCP servers. | |
| # Keeps repo-relative resolution for stdio commands while allowing optional output suppression. | |
| set -euo pipefail | |
| cd "$(dirname "$0")/.." | |
| if [[ "${MCP_STDIO_SILENT:-0}" == "1" ]]; then | |
| exec "$@" >/dev/null 2>&1 | |
| else | |
| exec "$@" | |
| fi | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment