Skip to content

Instantly share code, notes, and snippets.

@ochafik
Created November 25, 2025 20:00
Show Gist options
  • Select an option

  • Save ochafik/5d4328d2cc6573e4de465592dcceaf04 to your computer and use it in GitHub Desktop.

Select an option

Save ochafik/5d4328d2cc6573e4de465592dcceaf04 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# ruff: noqa: T201
"""
MCP Server Auth Discovery Probe Tool.
This script probes MCP server endpoints to classify their auth mechanisms,
specifically checking for:
- WWW-Authenticate header content (resource_metadata URL, scope)
- Protected Resource Metadata (PRM) availability
- OAuth/OIDC Authorization Server Metadata location
Usage:
python -m api.toolbox.mcp_auth_probe https://server1.example.com/mcp https://server2.example.com/mcp
"""
import argparse
import asyncio
import json
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from urllib.parse import urljoin, urlparse
import httpx
MCP_PROTOCOL_VERSION = "mcp-protocol-version"
LATEST_PROTOCOL_VERSION = "2025-06-18"
HTTP_TIMEOUT = 15.0
@dataclass
class WwwAuthenticateInfo:
"""Information extracted from WWW-Authenticate header."""
raw_header: str | None = None
auth_scheme: str | None = None
resource_metadata_url: str | None = None
scope: str | None = None
realm: str | None = None
error: str | None = None
error_description: str | None = None
@dataclass
class DiscoveryAttempt:
"""Result of a single discovery URL attempt."""
url: str
status_code: int | None = None
success: bool = False
error: str | None = None
content: dict[str, Any] | None = None
content_type: str | None = None
def _empty_discovery_attempts() -> "list[DiscoveryAttempt]":
return []
def _empty_str_list() -> list[str]:
return []
@dataclass
class ProtectedResourceMetadataInfo:
"""Information about Protected Resource Metadata discovery."""
available: bool = False
discovery_attempts: "list[DiscoveryAttempt]" = field(
default_factory=_empty_discovery_attempts
)
successful_url: str | None = None
resource: str | None = None
authorization_servers: list[str] = field(default_factory=_empty_str_list)
scopes_supported: list[str] = field(default_factory=_empty_str_list)
bearer_methods_supported: list[str] = field(default_factory=_empty_str_list)
raw_metadata: dict[str, Any] | None = None
@dataclass
class AuthServerMetadataInfo:
"""Information about OAuth/OIDC Authorization Server Metadata discovery."""
available: bool = False
discovery_attempts: "list[DiscoveryAttempt]" = field(
default_factory=_empty_discovery_attempts
)
successful_url: str | None = None
issuer: str | None = None
authorization_endpoint: str | None = None
token_endpoint: str | None = None
registration_endpoint: str | None = None
scopes_supported: list[str] = field(default_factory=_empty_str_list)
response_types_supported: list[str] = field(default_factory=_empty_str_list)
grant_types_supported: list[str] = field(default_factory=_empty_str_list)
code_challenge_methods_supported: list[str] = field(default_factory=_empty_str_list)
raw_metadata: dict[str, Any] | None = None
discovery_type: str | None = None # "oauth" or "oidc"
@dataclass
class InitialProbeResult:
"""Result of initial probe request."""
status_code: int | None = None
requires_auth: bool = False
error: str | None = None
www_authenticate: WwwAuthenticateInfo | None = None
mcp_protocol_version_response: str | None = None
@dataclass
class McpServerAuthProbeResult:
"""Complete auth discovery probe result for an MCP server."""
server_url: str
probe_timestamp: str
initial_probe: InitialProbeResult | None = None
protected_resource_metadata: ProtectedResourceMetadataInfo | None = None
auth_server_metadata: AuthServerMetadataInfo | None = None
classification: str = "unknown"
notes: list[str] = field(default_factory=_empty_str_list)
def extract_field_from_www_auth(www_auth_header: str, field_name: str) -> str | None:
"""Extract field from WWW-Authenticate header."""
pattern = rf'{field_name}=(?:"([^"]+)"|([^\s,]+))'
match = re.search(pattern, www_auth_header, re.IGNORECASE)
if match:
return match.group(1) or match.group(2)
return None
def parse_www_authenticate(response: httpx.Response) -> WwwAuthenticateInfo:
"""Parse WWW-Authenticate header from response."""
info = WwwAuthenticateInfo()
www_auth = response.headers.get("WWW-Authenticate")
if not www_auth:
return info
info.raw_header = www_auth
auth_scheme_match = re.match(r"^(\w+)", www_auth)
if auth_scheme_match:
info.auth_scheme = auth_scheme_match.group(1)
info.resource_metadata_url = extract_field_from_www_auth(
www_auth, "resource_metadata"
)
info.scope = extract_field_from_www_auth(www_auth, "scope")
info.realm = extract_field_from_www_auth(www_auth, "realm")
info.error = extract_field_from_www_auth(www_auth, "error")
info.error_description = extract_field_from_www_auth(www_auth, "error_description")
return info
def build_prm_discovery_urls(www_auth_url: str | None, server_url: str) -> list[str]:
"""Build ordered list of URLs for PRM discovery (SEP-985)."""
urls: list[str] = []
if www_auth_url:
urls.append(www_auth_url)
parsed = urlparse(server_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
if parsed.path and parsed.path != "/":
path_based_url = urljoin(
base_url, f"/.well-known/oauth-protected-resource{parsed.path}"
)
urls.append(path_based_url)
root_based_url = urljoin(base_url, "/.well-known/oauth-protected-resource")
urls.append(root_based_url)
return urls
def build_auth_server_metadata_discovery_urls(
auth_server_url: str | None, server_url: str
) -> list[tuple[str, str]]:
"""Build ordered list of (url, type) for auth server metadata discovery."""
urls: list[tuple[str, str]] = []
if not auth_server_url:
parsed = urlparse(server_url)
return [
(
f"{parsed.scheme}://{parsed.netloc}/.well-known/oauth-authorization-server",
"oauth-legacy",
)
]
parsed = urlparse(auth_server_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
if parsed.path and parsed.path != "/":
oauth_path = f"/.well-known/oauth-authorization-server{parsed.path.rstrip('/')}"
urls.append((urljoin(base_url, oauth_path), "oauth-path"))
oidc_path = f"/.well-known/openid-configuration{parsed.path.rstrip('/')}"
urls.append((urljoin(base_url, oidc_path), "oidc-path-8414"))
oidc_path_1_0 = f"{parsed.path.rstrip('/')}/.well-known/openid-configuration"
urls.append((urljoin(base_url, oidc_path_1_0), "oidc-path-1.0"))
urls.append(
(urljoin(base_url, "/.well-known/oauth-authorization-server"), "oauth-root")
)
urls.append((urljoin(base_url, "/.well-known/openid-configuration"), "oidc-root"))
return urls
async def fetch_url(
client: httpx.AsyncClient, url: str, method: str = "GET"
) -> DiscoveryAttempt:
"""Fetch a URL and return discovery attempt result."""
attempt = DiscoveryAttempt(url=url)
try:
headers = {MCP_PROTOCOL_VERSION: LATEST_PROTOCOL_VERSION}
if method == "POST":
headers["Content-Type"] = "application/json"
response = await client.post(
url, headers=headers, json={}, timeout=HTTP_TIMEOUT
)
else:
response = await client.get(url, headers=headers, timeout=HTTP_TIMEOUT)
attempt.status_code = response.status_code
attempt.content_type = response.headers.get("Content-Type")
if response.status_code == 200:
try:
attempt.content = response.json()
attempt.success = True
except json.JSONDecodeError as e:
attempt.error = f"Invalid JSON: {e}"
elif response.status_code == 404:
attempt.error = "Not found"
else:
attempt.error = f"HTTP {response.status_code}"
except httpx.TimeoutException:
attempt.error = "Request timeout"
except httpx.ConnectError as e:
attempt.error = f"Connection error: {e}"
except Exception as e:
attempt.error = f"Error: {type(e).__name__}: {e}"
return attempt
async def probe_initial_request(
client: httpx.AsyncClient, server_url: str
) -> InitialProbeResult:
"""Make initial POST request to trigger 401 and extract WWW-Authenticate."""
result = InitialProbeResult()
try:
headers = {
MCP_PROTOCOL_VERSION: LATEST_PROTOCOL_VERSION,
"Content-Type": "application/json",
}
response = await client.post(
server_url, headers=headers, json={}, timeout=HTTP_TIMEOUT
)
result.status_code = response.status_code
result.mcp_protocol_version_response = response.headers.get(
MCP_PROTOCOL_VERSION
)
if response.status_code == 401:
result.requires_auth = True
result.www_authenticate = parse_www_authenticate(response)
elif response.status_code == 200:
result.requires_auth = False
else:
result.error = f"Unexpected status: {response.status_code}"
except httpx.TimeoutException:
result.error = "Request timeout"
except httpx.ConnectError as e:
result.error = f"Connection error: {e}"
except Exception as e:
result.error = f"Error: {type(e).__name__}: {e}"
return result
async def discover_prm(
client: httpx.AsyncClient, www_auth_url: str | None, server_url: str
) -> ProtectedResourceMetadataInfo:
"""Discover Protected Resource Metadata."""
info = ProtectedResourceMetadataInfo()
urls = build_prm_discovery_urls(www_auth_url, server_url)
for url in urls:
attempt = await fetch_url(client, url)
info.discovery_attempts.append(attempt)
if attempt.success and attempt.content:
info.available = True
info.successful_url = url
info.raw_metadata = attempt.content
info.resource = attempt.content.get("resource")
info.authorization_servers = attempt.content.get(
"authorization_servers", []
)
info.scopes_supported = attempt.content.get("scopes_supported", [])
info.bearer_methods_supported = attempt.content.get(
"bearer_methods_supported", []
)
break
return info
async def discover_auth_server_metadata(
client: httpx.AsyncClient, prm_auth_server_url: str | None, server_url: str
) -> AuthServerMetadataInfo:
"""Discover OAuth/OIDC Authorization Server Metadata."""
info = AuthServerMetadataInfo()
urls = build_auth_server_metadata_discovery_urls(prm_auth_server_url, server_url)
for url, discovery_type in urls:
attempt = await fetch_url(client, url)
info.discovery_attempts.append(attempt)
if attempt.success and attempt.content:
info.available = True
info.successful_url = url
info.raw_metadata = attempt.content
info.discovery_type = discovery_type
info.issuer = attempt.content.get("issuer")
info.authorization_endpoint = attempt.content.get("authorization_endpoint")
info.token_endpoint = attempt.content.get("token_endpoint")
info.registration_endpoint = attempt.content.get("registration_endpoint")
info.scopes_supported = attempt.content.get("scopes_supported", [])
info.response_types_supported = attempt.content.get(
"response_types_supported", []
)
info.grant_types_supported = attempt.content.get(
"grant_types_supported", []
)
info.code_challenge_methods_supported = attempt.content.get(
"code_challenge_methods_supported", []
)
break
return info
def classify_server(result: McpServerAuthProbeResult) -> str:
"""Classify the server based on probe results."""
notes: list[str] = []
if result.initial_probe and result.initial_probe.error:
return "error"
if result.initial_probe and not result.initial_probe.requires_auth:
return "no-auth-required"
has_prm = (
result.protected_resource_metadata
and result.protected_resource_metadata.available
)
has_auth_meta = (
result.auth_server_metadata and result.auth_server_metadata.available
)
www_auth = result.initial_probe.www_authenticate if result.initial_probe else None
has_www_auth_prm_url = www_auth and www_auth.resource_metadata_url
has_www_auth_scope = www_auth and www_auth.scope
if has_www_auth_prm_url:
assert www_auth is not None # narrowing for pyright
notes.append(
f"WWW-Authenticate contains resource_metadata URL: {www_auth.resource_metadata_url}"
)
if has_prm:
prm = result.protected_resource_metadata
assert prm is not None
if prm.successful_url != www_auth.resource_metadata_url:
notes.append(
f" WARNING: PRM was NOT found at WWW-Authenticate URL, "
f"but at fallback: {prm.successful_url}"
)
else:
notes.append(" PRM found at WWW-Authenticate URL (as expected)")
else:
notes.append(
" WARNING: WWW-Authenticate resource_metadata URL returned 404!"
)
if has_www_auth_scope:
assert www_auth is not None # narrowing for pyright
notes.append(f"WWW-Authenticate contains scope: {www_auth.scope}")
if www_auth and www_auth.error:
notes.append(f"WWW-Authenticate error: {www_auth.error}")
if www_auth and www_auth.error_description:
notes.append(
f"WWW-Authenticate error_description: {www_auth.error_description}"
)
if has_prm:
prm = result.protected_resource_metadata
assert prm is not None
if prm.resource:
notes.append(f"PRM resource: {prm.resource}")
if prm.authorization_servers:
notes.append(f"PRM authorization_servers: {prm.authorization_servers}")
if prm.scopes_supported:
notes.append(f"PRM scopes_supported: {prm.scopes_supported}")
if prm.bearer_methods_supported:
notes.append(
f"PRM bearer_methods_supported: {prm.bearer_methods_supported}"
)
if has_auth_meta:
auth = result.auth_server_metadata
assert auth is not None
notes.append(
f"Auth metadata discovered via '{auth.discovery_type}' at {auth.successful_url}"
)
if auth.registration_endpoint:
notes.append("Supports dynamic client registration (DCR)")
if auth.code_challenge_methods_supported:
notes.append(f"PKCE methods: {auth.code_challenge_methods_supported}")
if auth.grant_types_supported:
notes.append(f"Grant types: {auth.grant_types_supported}")
result.notes = notes
if has_prm and has_auth_meta:
return "full-oauth-2025-06-18"
elif has_prm and not has_auth_meta:
return "prm-only"
elif not has_prm and has_auth_meta:
return "legacy-oauth-2025-03-26"
elif result.initial_probe and result.initial_probe.requires_auth:
return "auth-required-no-metadata"
else:
return "unknown"
async def probe_mcp_server(server_url: str) -> McpServerAuthProbeResult:
"""Probe a single MCP server for auth discovery information."""
result = McpServerAuthProbeResult(
server_url=server_url,
probe_timestamp=datetime.now(UTC).isoformat(),
)
async with httpx.AsyncClient(follow_redirects=True) as client:
result.initial_probe = await probe_initial_request(client, server_url)
if result.initial_probe.error:
result.classification = "error"
result.notes.append(f"Initial probe failed: {result.initial_probe.error}")
return result
if not result.initial_probe.requires_auth:
result.classification = "no-auth-required"
result.notes.append("Server does not require authentication")
return result
www_auth_prm_url = None
if result.initial_probe.www_authenticate:
www_auth_prm_url = (
result.initial_probe.www_authenticate.resource_metadata_url
)
result.protected_resource_metadata = await discover_prm(
client, www_auth_prm_url, server_url
)
prm_auth_server = None
if (
result.protected_resource_metadata.available
and result.protected_resource_metadata.authorization_servers
):
prm_auth_server = result.protected_resource_metadata.authorization_servers[
0
]
result.auth_server_metadata = await discover_auth_server_metadata(
client, prm_auth_server, server_url
)
result.classification = classify_server(result)
return result
async def probe_multiple_servers(
server_urls: list[str],
) -> list[McpServerAuthProbeResult]:
"""Probe multiple MCP servers in parallel."""
tasks = [probe_mcp_server(url) for url in server_urls]
return await asyncio.gather(*tasks)
def result_to_dict(
result: McpServerAuthProbeResult, include_raw: bool = False
) -> dict[str, Any]:
"""Convert probe result to dictionary for JSON serialization."""
d: dict[str, Any] = {
"server_url": result.server_url,
"probe_timestamp": result.probe_timestamp,
"classification": result.classification,
"notes": result.notes,
}
if result.initial_probe:
ip = result.initial_probe
ip_dict: dict[str, Any] = {
"status_code": ip.status_code,
"requires_auth": ip.requires_auth,
"error": ip.error,
"mcp_protocol_version_response": ip.mcp_protocol_version_response,
}
if ip.www_authenticate:
wa = ip.www_authenticate
ip_dict["www_authenticate"] = {
"raw_header": wa.raw_header,
"auth_scheme": wa.auth_scheme,
"resource_metadata_url": wa.resource_metadata_url,
"scope": wa.scope,
"realm": wa.realm,
"error": wa.error,
}
d["initial_probe"] = ip_dict
if result.protected_resource_metadata:
prm = result.protected_resource_metadata
prm_dict: dict[str, Any] = {
"available": prm.available,
"discovery_attempts": [
{
"url": a.url,
"status_code": a.status_code,
"success": a.success,
"error": a.error,
}
for a in prm.discovery_attempts
],
"successful_url": prm.successful_url,
"resource": prm.resource,
"authorization_servers": prm.authorization_servers,
"scopes_supported": prm.scopes_supported,
"bearer_methods_supported": prm.bearer_methods_supported,
}
if include_raw and prm.raw_metadata:
prm_dict["raw_metadata"] = prm.raw_metadata
d["protected_resource_metadata"] = prm_dict
if result.auth_server_metadata:
asm = result.auth_server_metadata
asm_dict: dict[str, Any] = {
"available": asm.available,
"discovery_type": asm.discovery_type,
"discovery_attempts": [
{
"url": a.url,
"status_code": a.status_code,
"success": a.success,
"error": a.error,
}
for a in asm.discovery_attempts
],
"successful_url": asm.successful_url,
"issuer": asm.issuer,
"authorization_endpoint": asm.authorization_endpoint,
"token_endpoint": asm.token_endpoint,
"registration_endpoint": asm.registration_endpoint,
"scopes_supported": asm.scopes_supported,
"code_challenge_methods_supported": asm.code_challenge_methods_supported,
}
if include_raw and asm.raw_metadata:
asm_dict["raw_metadata"] = asm.raw_metadata
d["auth_server_metadata"] = asm_dict
return d
def print_summary(results: list[McpServerAuthProbeResult]) -> None:
"""Print a human-readable summary of results."""
print("\n" + "=" * 80)
print("MCP SERVER AUTH DISCOVERY PROBE SUMMARY")
print("=" * 80)
for result in results:
print(f"\n{'-' * 80}")
print(f"Server: {result.server_url}")
print(f"Classification: {result.classification}")
print(f"Timestamp: {result.probe_timestamp}")
if result.notes:
print("\nNotes:")
for note in result.notes:
print(f" - {note}")
if result.initial_probe:
ip = result.initial_probe
print(f"\nInitial Probe:")
print(f" Status: {ip.status_code}")
print(f" Requires Auth: {ip.requires_auth}")
if ip.error:
print(f" Error: {ip.error}")
if ip.www_authenticate and ip.www_authenticate.raw_header:
print(f" WWW-Authenticate: {ip.www_authenticate.raw_header[:100]}...")
if ip.www_authenticate.resource_metadata_url:
print(
f" resource_metadata: {ip.www_authenticate.resource_metadata_url}"
)
if ip.www_authenticate.scope:
print(f" scope: {ip.www_authenticate.scope}")
if result.protected_resource_metadata:
prm = result.protected_resource_metadata
print(f"\nProtected Resource Metadata:")
print(f" Available: {prm.available}")
if prm.successful_url:
print(f" Successful URL: {prm.successful_url}")
if prm.resource:
print(f" Resource: {prm.resource}")
if prm.authorization_servers:
print(f" Authorization Servers: {prm.authorization_servers}")
if prm.scopes_supported:
print(f" Scopes Supported: {prm.scopes_supported}")
print(" Discovery Attempts:")
for attempt in prm.discovery_attempts:
status = f"[{attempt.status_code}]" if attempt.status_code else "[--]"
success_marker = "✓" if attempt.success else "✗"
print(f" {success_marker} {status} {attempt.url}")
if attempt.error and not attempt.success:
print(f" Error: {attempt.error}")
if result.auth_server_metadata:
asm = result.auth_server_metadata
print(f"\nAuth Server Metadata:")
print(f" Available: {asm.available}")
if asm.discovery_type:
print(f" Discovery Type: {asm.discovery_type}")
if asm.successful_url:
print(f" Successful URL: {asm.successful_url}")
if asm.issuer:
print(f" Issuer: {asm.issuer}")
if asm.authorization_endpoint:
print(f" Authorization Endpoint: {asm.authorization_endpoint}")
if asm.token_endpoint:
print(f" Token Endpoint: {asm.token_endpoint}")
if asm.registration_endpoint:
print(f" Registration Endpoint: {asm.registration_endpoint}")
if asm.scopes_supported:
print(f" Scopes Supported: {asm.scopes_supported}")
if asm.code_challenge_methods_supported:
print(f" PKCE Methods: {asm.code_challenge_methods_supported}")
print(" Discovery Attempts:")
for attempt in asm.discovery_attempts:
status = f"[{attempt.status_code}]" if attempt.status_code else "[--]"
success_marker = "✓" if attempt.success else "✗"
print(f" {success_marker} {status} {attempt.url}")
if attempt.error and not attempt.success:
print(f" Error: {attempt.error}")
print("\n" + "=" * 80)
classifications: dict[str, int] = {}
for r in results:
classifications[r.classification] = classifications.get(r.classification, 0) + 1
print("Classification Summary:")
for cls, count in sorted(classifications.items()):
print(f" {cls}: {count}")
print("=" * 80 + "\n")
def main() -> None:
parser = argparse.ArgumentParser(
description="Probe MCP server endpoints to classify their auth mechanisms.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s https://microsoft365.mcp.claude.com/mcp
%(prog)s https://server1.example.com/mcp https://server2.example.com/mcp
%(prog)s --json https://server.example.com/mcp > results.json
Classifications:
full-oauth-2025-06-18 - Has PRM and auth server metadata (latest MCP spec)
legacy-oauth-2025-03-26 - Has auth server metadata but no PRM (older MCP spec)
prm-only - Has PRM but no discoverable auth server metadata
no-auth-required - Server does not require authentication
auth-required-no-metadata - Server requires auth but no metadata discoverable
error - Connection or request error
unknown - Could not determine classification
""",
)
parser.add_argument("urls", nargs="+", help="MCP server URLs to probe")
parser.add_argument("--json", action="store_true", help="Output results as JSON")
parser.add_argument(
"--json-pretty",
action="store_true",
help="Output results as pretty-printed JSON",
)
parser.add_argument(
"--include-raw", action="store_true", help="Include raw metadata in JSON output"
)
args = parser.parse_args()
results = asyncio.run(probe_multiple_servers(args.urls))
if args.json or args.json_pretty:
output = [result_to_dict(r, include_raw=args.include_raw) for r in results]
if args.json_pretty:
print(json.dumps(output, indent=2))
else:
print(json.dumps(output))
else:
print_summary(results)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment