-
-
Save ochafik/5d4328d2cc6573e4de465592dcceaf04 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # ruff: noqa: T201 | |
| """ | |
| MCP Server Auth Discovery Probe Tool. | |
| This script probes MCP server endpoints to classify their auth mechanisms, | |
| specifically checking for: | |
| - WWW-Authenticate header content (resource_metadata URL, scope) | |
| - Protected Resource Metadata (PRM) availability | |
| - OAuth/OIDC Authorization Server Metadata location | |
| Usage: | |
| python -m api.toolbox.mcp_auth_probe https://server1.example.com/mcp https://server2.example.com/mcp | |
| """ | |
| import argparse | |
| import asyncio | |
| import json | |
| import re | |
| from dataclasses import dataclass, field | |
| from datetime import UTC, datetime | |
| from typing import Any | |
| from urllib.parse import urljoin, urlparse | |
| import httpx | |
| MCP_PROTOCOL_VERSION = "mcp-protocol-version" | |
| LATEST_PROTOCOL_VERSION = "2025-06-18" | |
| HTTP_TIMEOUT = 15.0 | |
| @dataclass | |
| class WwwAuthenticateInfo: | |
| """Information extracted from WWW-Authenticate header.""" | |
| raw_header: str | None = None | |
| auth_scheme: str | None = None | |
| resource_metadata_url: str | None = None | |
| scope: str | None = None | |
| realm: str | None = None | |
| error: str | None = None | |
| error_description: str | None = None | |
| @dataclass | |
| class DiscoveryAttempt: | |
| """Result of a single discovery URL attempt.""" | |
| url: str | |
| status_code: int | None = None | |
| success: bool = False | |
| error: str | None = None | |
| content: dict[str, Any] | None = None | |
| content_type: str | None = None | |
| def _empty_discovery_attempts() -> "list[DiscoveryAttempt]": | |
| return [] | |
| def _empty_str_list() -> list[str]: | |
| return [] | |
| @dataclass | |
| class ProtectedResourceMetadataInfo: | |
| """Information about Protected Resource Metadata discovery.""" | |
| available: bool = False | |
| discovery_attempts: "list[DiscoveryAttempt]" = field( | |
| default_factory=_empty_discovery_attempts | |
| ) | |
| successful_url: str | None = None | |
| resource: str | None = None | |
| authorization_servers: list[str] = field(default_factory=_empty_str_list) | |
| scopes_supported: list[str] = field(default_factory=_empty_str_list) | |
| bearer_methods_supported: list[str] = field(default_factory=_empty_str_list) | |
| raw_metadata: dict[str, Any] | None = None | |
| @dataclass | |
| class AuthServerMetadataInfo: | |
| """Information about OAuth/OIDC Authorization Server Metadata discovery.""" | |
| available: bool = False | |
| discovery_attempts: "list[DiscoveryAttempt]" = field( | |
| default_factory=_empty_discovery_attempts | |
| ) | |
| successful_url: str | None = None | |
| issuer: str | None = None | |
| authorization_endpoint: str | None = None | |
| token_endpoint: str | None = None | |
| registration_endpoint: str | None = None | |
| scopes_supported: list[str] = field(default_factory=_empty_str_list) | |
| response_types_supported: list[str] = field(default_factory=_empty_str_list) | |
| grant_types_supported: list[str] = field(default_factory=_empty_str_list) | |
| code_challenge_methods_supported: list[str] = field(default_factory=_empty_str_list) | |
| raw_metadata: dict[str, Any] | None = None | |
| discovery_type: str | None = None # "oauth" or "oidc" | |
| @dataclass | |
| class InitialProbeResult: | |
| """Result of initial probe request.""" | |
| status_code: int | None = None | |
| requires_auth: bool = False | |
| error: str | None = None | |
| www_authenticate: WwwAuthenticateInfo | None = None | |
| mcp_protocol_version_response: str | None = None | |
| @dataclass | |
| class McpServerAuthProbeResult: | |
| """Complete auth discovery probe result for an MCP server.""" | |
| server_url: str | |
| probe_timestamp: str | |
| initial_probe: InitialProbeResult | None = None | |
| protected_resource_metadata: ProtectedResourceMetadataInfo | None = None | |
| auth_server_metadata: AuthServerMetadataInfo | None = None | |
| classification: str = "unknown" | |
| notes: list[str] = field(default_factory=_empty_str_list) | |
| def extract_field_from_www_auth(www_auth_header: str, field_name: str) -> str | None: | |
| """Extract field from WWW-Authenticate header.""" | |
| pattern = rf'{field_name}=(?:"([^"]+)"|([^\s,]+))' | |
| match = re.search(pattern, www_auth_header, re.IGNORECASE) | |
| if match: | |
| return match.group(1) or match.group(2) | |
| return None | |
| def parse_www_authenticate(response: httpx.Response) -> WwwAuthenticateInfo: | |
| """Parse WWW-Authenticate header from response.""" | |
| info = WwwAuthenticateInfo() | |
| www_auth = response.headers.get("WWW-Authenticate") | |
| if not www_auth: | |
| return info | |
| info.raw_header = www_auth | |
| auth_scheme_match = re.match(r"^(\w+)", www_auth) | |
| if auth_scheme_match: | |
| info.auth_scheme = auth_scheme_match.group(1) | |
| info.resource_metadata_url = extract_field_from_www_auth( | |
| www_auth, "resource_metadata" | |
| ) | |
| info.scope = extract_field_from_www_auth(www_auth, "scope") | |
| info.realm = extract_field_from_www_auth(www_auth, "realm") | |
| info.error = extract_field_from_www_auth(www_auth, "error") | |
| info.error_description = extract_field_from_www_auth(www_auth, "error_description") | |
| return info | |
| def build_prm_discovery_urls(www_auth_url: str | None, server_url: str) -> list[str]: | |
| """Build ordered list of URLs for PRM discovery (SEP-985).""" | |
| urls: list[str] = [] | |
| if www_auth_url: | |
| urls.append(www_auth_url) | |
| parsed = urlparse(server_url) | |
| base_url = f"{parsed.scheme}://{parsed.netloc}" | |
| if parsed.path and parsed.path != "/": | |
| path_based_url = urljoin( | |
| base_url, f"/.well-known/oauth-protected-resource{parsed.path}" | |
| ) | |
| urls.append(path_based_url) | |
| root_based_url = urljoin(base_url, "/.well-known/oauth-protected-resource") | |
| urls.append(root_based_url) | |
| return urls | |
| def build_auth_server_metadata_discovery_urls( | |
| auth_server_url: str | None, server_url: str | |
| ) -> list[tuple[str, str]]: | |
| """Build ordered list of (url, type) for auth server metadata discovery.""" | |
| urls: list[tuple[str, str]] = [] | |
| if not auth_server_url: | |
| parsed = urlparse(server_url) | |
| return [ | |
| ( | |
| f"{parsed.scheme}://{parsed.netloc}/.well-known/oauth-authorization-server", | |
| "oauth-legacy", | |
| ) | |
| ] | |
| parsed = urlparse(auth_server_url) | |
| base_url = f"{parsed.scheme}://{parsed.netloc}" | |
| if parsed.path and parsed.path != "/": | |
| oauth_path = f"/.well-known/oauth-authorization-server{parsed.path.rstrip('/')}" | |
| urls.append((urljoin(base_url, oauth_path), "oauth-path")) | |
| oidc_path = f"/.well-known/openid-configuration{parsed.path.rstrip('/')}" | |
| urls.append((urljoin(base_url, oidc_path), "oidc-path-8414")) | |
| oidc_path_1_0 = f"{parsed.path.rstrip('/')}/.well-known/openid-configuration" | |
| urls.append((urljoin(base_url, oidc_path_1_0), "oidc-path-1.0")) | |
| urls.append( | |
| (urljoin(base_url, "/.well-known/oauth-authorization-server"), "oauth-root") | |
| ) | |
| urls.append((urljoin(base_url, "/.well-known/openid-configuration"), "oidc-root")) | |
| return urls | |
| async def fetch_url( | |
| client: httpx.AsyncClient, url: str, method: str = "GET" | |
| ) -> DiscoveryAttempt: | |
| """Fetch a URL and return discovery attempt result.""" | |
| attempt = DiscoveryAttempt(url=url) | |
| try: | |
| headers = {MCP_PROTOCOL_VERSION: LATEST_PROTOCOL_VERSION} | |
| if method == "POST": | |
| headers["Content-Type"] = "application/json" | |
| response = await client.post( | |
| url, headers=headers, json={}, timeout=HTTP_TIMEOUT | |
| ) | |
| else: | |
| response = await client.get(url, headers=headers, timeout=HTTP_TIMEOUT) | |
| attempt.status_code = response.status_code | |
| attempt.content_type = response.headers.get("Content-Type") | |
| if response.status_code == 200: | |
| try: | |
| attempt.content = response.json() | |
| attempt.success = True | |
| except json.JSONDecodeError as e: | |
| attempt.error = f"Invalid JSON: {e}" | |
| elif response.status_code == 404: | |
| attempt.error = "Not found" | |
| else: | |
| attempt.error = f"HTTP {response.status_code}" | |
| except httpx.TimeoutException: | |
| attempt.error = "Request timeout" | |
| except httpx.ConnectError as e: | |
| attempt.error = f"Connection error: {e}" | |
| except Exception as e: | |
| attempt.error = f"Error: {type(e).__name__}: {e}" | |
| return attempt | |
| async def probe_initial_request( | |
| client: httpx.AsyncClient, server_url: str | |
| ) -> InitialProbeResult: | |
| """Make initial POST request to trigger 401 and extract WWW-Authenticate.""" | |
| result = InitialProbeResult() | |
| try: | |
| headers = { | |
| MCP_PROTOCOL_VERSION: LATEST_PROTOCOL_VERSION, | |
| "Content-Type": "application/json", | |
| } | |
| response = await client.post( | |
| server_url, headers=headers, json={}, timeout=HTTP_TIMEOUT | |
| ) | |
| result.status_code = response.status_code | |
| result.mcp_protocol_version_response = response.headers.get( | |
| MCP_PROTOCOL_VERSION | |
| ) | |
| if response.status_code == 401: | |
| result.requires_auth = True | |
| result.www_authenticate = parse_www_authenticate(response) | |
| elif response.status_code == 200: | |
| result.requires_auth = False | |
| else: | |
| result.error = f"Unexpected status: {response.status_code}" | |
| except httpx.TimeoutException: | |
| result.error = "Request timeout" | |
| except httpx.ConnectError as e: | |
| result.error = f"Connection error: {e}" | |
| except Exception as e: | |
| result.error = f"Error: {type(e).__name__}: {e}" | |
| return result | |
| async def discover_prm( | |
| client: httpx.AsyncClient, www_auth_url: str | None, server_url: str | |
| ) -> ProtectedResourceMetadataInfo: | |
| """Discover Protected Resource Metadata.""" | |
| info = ProtectedResourceMetadataInfo() | |
| urls = build_prm_discovery_urls(www_auth_url, server_url) | |
| for url in urls: | |
| attempt = await fetch_url(client, url) | |
| info.discovery_attempts.append(attempt) | |
| if attempt.success and attempt.content: | |
| info.available = True | |
| info.successful_url = url | |
| info.raw_metadata = attempt.content | |
| info.resource = attempt.content.get("resource") | |
| info.authorization_servers = attempt.content.get( | |
| "authorization_servers", [] | |
| ) | |
| info.scopes_supported = attempt.content.get("scopes_supported", []) | |
| info.bearer_methods_supported = attempt.content.get( | |
| "bearer_methods_supported", [] | |
| ) | |
| break | |
| return info | |
| async def discover_auth_server_metadata( | |
| client: httpx.AsyncClient, prm_auth_server_url: str | None, server_url: str | |
| ) -> AuthServerMetadataInfo: | |
| """Discover OAuth/OIDC Authorization Server Metadata.""" | |
| info = AuthServerMetadataInfo() | |
| urls = build_auth_server_metadata_discovery_urls(prm_auth_server_url, server_url) | |
| for url, discovery_type in urls: | |
| attempt = await fetch_url(client, url) | |
| info.discovery_attempts.append(attempt) | |
| if attempt.success and attempt.content: | |
| info.available = True | |
| info.successful_url = url | |
| info.raw_metadata = attempt.content | |
| info.discovery_type = discovery_type | |
| info.issuer = attempt.content.get("issuer") | |
| info.authorization_endpoint = attempt.content.get("authorization_endpoint") | |
| info.token_endpoint = attempt.content.get("token_endpoint") | |
| info.registration_endpoint = attempt.content.get("registration_endpoint") | |
| info.scopes_supported = attempt.content.get("scopes_supported", []) | |
| info.response_types_supported = attempt.content.get( | |
| "response_types_supported", [] | |
| ) | |
| info.grant_types_supported = attempt.content.get( | |
| "grant_types_supported", [] | |
| ) | |
| info.code_challenge_methods_supported = attempt.content.get( | |
| "code_challenge_methods_supported", [] | |
| ) | |
| break | |
| return info | |
| def classify_server(result: McpServerAuthProbeResult) -> str: | |
| """Classify the server based on probe results.""" | |
| notes: list[str] = [] | |
| if result.initial_probe and result.initial_probe.error: | |
| return "error" | |
| if result.initial_probe and not result.initial_probe.requires_auth: | |
| return "no-auth-required" | |
| has_prm = ( | |
| result.protected_resource_metadata | |
| and result.protected_resource_metadata.available | |
| ) | |
| has_auth_meta = ( | |
| result.auth_server_metadata and result.auth_server_metadata.available | |
| ) | |
| www_auth = result.initial_probe.www_authenticate if result.initial_probe else None | |
| has_www_auth_prm_url = www_auth and www_auth.resource_metadata_url | |
| has_www_auth_scope = www_auth and www_auth.scope | |
| if has_www_auth_prm_url: | |
| assert www_auth is not None # narrowing for pyright | |
| notes.append( | |
| f"WWW-Authenticate contains resource_metadata URL: {www_auth.resource_metadata_url}" | |
| ) | |
| if has_prm: | |
| prm = result.protected_resource_metadata | |
| assert prm is not None | |
| if prm.successful_url != www_auth.resource_metadata_url: | |
| notes.append( | |
| f" WARNING: PRM was NOT found at WWW-Authenticate URL, " | |
| f"but at fallback: {prm.successful_url}" | |
| ) | |
| else: | |
| notes.append(" PRM found at WWW-Authenticate URL (as expected)") | |
| else: | |
| notes.append( | |
| " WARNING: WWW-Authenticate resource_metadata URL returned 404!" | |
| ) | |
| if has_www_auth_scope: | |
| assert www_auth is not None # narrowing for pyright | |
| notes.append(f"WWW-Authenticate contains scope: {www_auth.scope}") | |
| if www_auth and www_auth.error: | |
| notes.append(f"WWW-Authenticate error: {www_auth.error}") | |
| if www_auth and www_auth.error_description: | |
| notes.append( | |
| f"WWW-Authenticate error_description: {www_auth.error_description}" | |
| ) | |
| if has_prm: | |
| prm = result.protected_resource_metadata | |
| assert prm is not None | |
| if prm.resource: | |
| notes.append(f"PRM resource: {prm.resource}") | |
| if prm.authorization_servers: | |
| notes.append(f"PRM authorization_servers: {prm.authorization_servers}") | |
| if prm.scopes_supported: | |
| notes.append(f"PRM scopes_supported: {prm.scopes_supported}") | |
| if prm.bearer_methods_supported: | |
| notes.append( | |
| f"PRM bearer_methods_supported: {prm.bearer_methods_supported}" | |
| ) | |
| if has_auth_meta: | |
| auth = result.auth_server_metadata | |
| assert auth is not None | |
| notes.append( | |
| f"Auth metadata discovered via '{auth.discovery_type}' at {auth.successful_url}" | |
| ) | |
| if auth.registration_endpoint: | |
| notes.append("Supports dynamic client registration (DCR)") | |
| if auth.code_challenge_methods_supported: | |
| notes.append(f"PKCE methods: {auth.code_challenge_methods_supported}") | |
| if auth.grant_types_supported: | |
| notes.append(f"Grant types: {auth.grant_types_supported}") | |
| result.notes = notes | |
| if has_prm and has_auth_meta: | |
| return "full-oauth-2025-06-18" | |
| elif has_prm and not has_auth_meta: | |
| return "prm-only" | |
| elif not has_prm and has_auth_meta: | |
| return "legacy-oauth-2025-03-26" | |
| elif result.initial_probe and result.initial_probe.requires_auth: | |
| return "auth-required-no-metadata" | |
| else: | |
| return "unknown" | |
| async def probe_mcp_server(server_url: str) -> McpServerAuthProbeResult: | |
| """Probe a single MCP server for auth discovery information.""" | |
| result = McpServerAuthProbeResult( | |
| server_url=server_url, | |
| probe_timestamp=datetime.now(UTC).isoformat(), | |
| ) | |
| async with httpx.AsyncClient(follow_redirects=True) as client: | |
| result.initial_probe = await probe_initial_request(client, server_url) | |
| if result.initial_probe.error: | |
| result.classification = "error" | |
| result.notes.append(f"Initial probe failed: {result.initial_probe.error}") | |
| return result | |
| if not result.initial_probe.requires_auth: | |
| result.classification = "no-auth-required" | |
| result.notes.append("Server does not require authentication") | |
| return result | |
| www_auth_prm_url = None | |
| if result.initial_probe.www_authenticate: | |
| www_auth_prm_url = ( | |
| result.initial_probe.www_authenticate.resource_metadata_url | |
| ) | |
| result.protected_resource_metadata = await discover_prm( | |
| client, www_auth_prm_url, server_url | |
| ) | |
| prm_auth_server = None | |
| if ( | |
| result.protected_resource_metadata.available | |
| and result.protected_resource_metadata.authorization_servers | |
| ): | |
| prm_auth_server = result.protected_resource_metadata.authorization_servers[ | |
| 0 | |
| ] | |
| result.auth_server_metadata = await discover_auth_server_metadata( | |
| client, prm_auth_server, server_url | |
| ) | |
| result.classification = classify_server(result) | |
| return result | |
| async def probe_multiple_servers( | |
| server_urls: list[str], | |
| ) -> list[McpServerAuthProbeResult]: | |
| """Probe multiple MCP servers in parallel.""" | |
| tasks = [probe_mcp_server(url) for url in server_urls] | |
| return await asyncio.gather(*tasks) | |
| def result_to_dict( | |
| result: McpServerAuthProbeResult, include_raw: bool = False | |
| ) -> dict[str, Any]: | |
| """Convert probe result to dictionary for JSON serialization.""" | |
| d: dict[str, Any] = { | |
| "server_url": result.server_url, | |
| "probe_timestamp": result.probe_timestamp, | |
| "classification": result.classification, | |
| "notes": result.notes, | |
| } | |
| if result.initial_probe: | |
| ip = result.initial_probe | |
| ip_dict: dict[str, Any] = { | |
| "status_code": ip.status_code, | |
| "requires_auth": ip.requires_auth, | |
| "error": ip.error, | |
| "mcp_protocol_version_response": ip.mcp_protocol_version_response, | |
| } | |
| if ip.www_authenticate: | |
| wa = ip.www_authenticate | |
| ip_dict["www_authenticate"] = { | |
| "raw_header": wa.raw_header, | |
| "auth_scheme": wa.auth_scheme, | |
| "resource_metadata_url": wa.resource_metadata_url, | |
| "scope": wa.scope, | |
| "realm": wa.realm, | |
| "error": wa.error, | |
| } | |
| d["initial_probe"] = ip_dict | |
| if result.protected_resource_metadata: | |
| prm = result.protected_resource_metadata | |
| prm_dict: dict[str, Any] = { | |
| "available": prm.available, | |
| "discovery_attempts": [ | |
| { | |
| "url": a.url, | |
| "status_code": a.status_code, | |
| "success": a.success, | |
| "error": a.error, | |
| } | |
| for a in prm.discovery_attempts | |
| ], | |
| "successful_url": prm.successful_url, | |
| "resource": prm.resource, | |
| "authorization_servers": prm.authorization_servers, | |
| "scopes_supported": prm.scopes_supported, | |
| "bearer_methods_supported": prm.bearer_methods_supported, | |
| } | |
| if include_raw and prm.raw_metadata: | |
| prm_dict["raw_metadata"] = prm.raw_metadata | |
| d["protected_resource_metadata"] = prm_dict | |
| if result.auth_server_metadata: | |
| asm = result.auth_server_metadata | |
| asm_dict: dict[str, Any] = { | |
| "available": asm.available, | |
| "discovery_type": asm.discovery_type, | |
| "discovery_attempts": [ | |
| { | |
| "url": a.url, | |
| "status_code": a.status_code, | |
| "success": a.success, | |
| "error": a.error, | |
| } | |
| for a in asm.discovery_attempts | |
| ], | |
| "successful_url": asm.successful_url, | |
| "issuer": asm.issuer, | |
| "authorization_endpoint": asm.authorization_endpoint, | |
| "token_endpoint": asm.token_endpoint, | |
| "registration_endpoint": asm.registration_endpoint, | |
| "scopes_supported": asm.scopes_supported, | |
| "code_challenge_methods_supported": asm.code_challenge_methods_supported, | |
| } | |
| if include_raw and asm.raw_metadata: | |
| asm_dict["raw_metadata"] = asm.raw_metadata | |
| d["auth_server_metadata"] = asm_dict | |
| return d | |
| def print_summary(results: list[McpServerAuthProbeResult]) -> None: | |
| """Print a human-readable summary of results.""" | |
| print("\n" + "=" * 80) | |
| print("MCP SERVER AUTH DISCOVERY PROBE SUMMARY") | |
| print("=" * 80) | |
| for result in results: | |
| print(f"\n{'-' * 80}") | |
| print(f"Server: {result.server_url}") | |
| print(f"Classification: {result.classification}") | |
| print(f"Timestamp: {result.probe_timestamp}") | |
| if result.notes: | |
| print("\nNotes:") | |
| for note in result.notes: | |
| print(f" - {note}") | |
| if result.initial_probe: | |
| ip = result.initial_probe | |
| print(f"\nInitial Probe:") | |
| print(f" Status: {ip.status_code}") | |
| print(f" Requires Auth: {ip.requires_auth}") | |
| if ip.error: | |
| print(f" Error: {ip.error}") | |
| if ip.www_authenticate and ip.www_authenticate.raw_header: | |
| print(f" WWW-Authenticate: {ip.www_authenticate.raw_header[:100]}...") | |
| if ip.www_authenticate.resource_metadata_url: | |
| print( | |
| f" resource_metadata: {ip.www_authenticate.resource_metadata_url}" | |
| ) | |
| if ip.www_authenticate.scope: | |
| print(f" scope: {ip.www_authenticate.scope}") | |
| if result.protected_resource_metadata: | |
| prm = result.protected_resource_metadata | |
| print(f"\nProtected Resource Metadata:") | |
| print(f" Available: {prm.available}") | |
| if prm.successful_url: | |
| print(f" Successful URL: {prm.successful_url}") | |
| if prm.resource: | |
| print(f" Resource: {prm.resource}") | |
| if prm.authorization_servers: | |
| print(f" Authorization Servers: {prm.authorization_servers}") | |
| if prm.scopes_supported: | |
| print(f" Scopes Supported: {prm.scopes_supported}") | |
| print(" Discovery Attempts:") | |
| for attempt in prm.discovery_attempts: | |
| status = f"[{attempt.status_code}]" if attempt.status_code else "[--]" | |
| success_marker = "✓" if attempt.success else "✗" | |
| print(f" {success_marker} {status} {attempt.url}") | |
| if attempt.error and not attempt.success: | |
| print(f" Error: {attempt.error}") | |
| if result.auth_server_metadata: | |
| asm = result.auth_server_metadata | |
| print(f"\nAuth Server Metadata:") | |
| print(f" Available: {asm.available}") | |
| if asm.discovery_type: | |
| print(f" Discovery Type: {asm.discovery_type}") | |
| if asm.successful_url: | |
| print(f" Successful URL: {asm.successful_url}") | |
| if asm.issuer: | |
| print(f" Issuer: {asm.issuer}") | |
| if asm.authorization_endpoint: | |
| print(f" Authorization Endpoint: {asm.authorization_endpoint}") | |
| if asm.token_endpoint: | |
| print(f" Token Endpoint: {asm.token_endpoint}") | |
| if asm.registration_endpoint: | |
| print(f" Registration Endpoint: {asm.registration_endpoint}") | |
| if asm.scopes_supported: | |
| print(f" Scopes Supported: {asm.scopes_supported}") | |
| if asm.code_challenge_methods_supported: | |
| print(f" PKCE Methods: {asm.code_challenge_methods_supported}") | |
| print(" Discovery Attempts:") | |
| for attempt in asm.discovery_attempts: | |
| status = f"[{attempt.status_code}]" if attempt.status_code else "[--]" | |
| success_marker = "✓" if attempt.success else "✗" | |
| print(f" {success_marker} {status} {attempt.url}") | |
| if attempt.error and not attempt.success: | |
| print(f" Error: {attempt.error}") | |
| print("\n" + "=" * 80) | |
| classifications: dict[str, int] = {} | |
| for r in results: | |
| classifications[r.classification] = classifications.get(r.classification, 0) + 1 | |
| print("Classification Summary:") | |
| for cls, count in sorted(classifications.items()): | |
| print(f" {cls}: {count}") | |
| print("=" * 80 + "\n") | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Probe MCP server endpoints to classify their auth mechanisms.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s https://microsoft365.mcp.claude.com/mcp | |
| %(prog)s https://server1.example.com/mcp https://server2.example.com/mcp | |
| %(prog)s --json https://server.example.com/mcp > results.json | |
| Classifications: | |
| full-oauth-2025-06-18 - Has PRM and auth server metadata (latest MCP spec) | |
| legacy-oauth-2025-03-26 - Has auth server metadata but no PRM (older MCP spec) | |
| prm-only - Has PRM but no discoverable auth server metadata | |
| no-auth-required - Server does not require authentication | |
| auth-required-no-metadata - Server requires auth but no metadata discoverable | |
| error - Connection or request error | |
| unknown - Could not determine classification | |
| """, | |
| ) | |
| parser.add_argument("urls", nargs="+", help="MCP server URLs to probe") | |
| parser.add_argument("--json", action="store_true", help="Output results as JSON") | |
| parser.add_argument( | |
| "--json-pretty", | |
| action="store_true", | |
| help="Output results as pretty-printed JSON", | |
| ) | |
| parser.add_argument( | |
| "--include-raw", action="store_true", help="Include raw metadata in JSON output" | |
| ) | |
| args = parser.parse_args() | |
| results = asyncio.run(probe_multiple_servers(args.urls)) | |
| if args.json or args.json_pretty: | |
| output = [result_to_dict(r, include_raw=args.include_raw) for r in results] | |
| if args.json_pretty: | |
| print(json.dumps(output, indent=2)) | |
| else: | |
| print(json.dumps(output)) | |
| else: | |
| print_summary(results) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment