|
#!/usr/bin/env python3 |
|
""" |
|
WebAuthn virtual authenticator passkey flow via Chrome DevTools Protocol (CDP). |
|
|
|
Navigates to a URL, enables a virtual CTAP2 authenticator, clicks a button |
|
to trigger passkey creation/assertion, waits for the credential, and takes a |
|
screenshot of the final page state. |
|
|
|
Supports cross-origin iframes: use --iframe-origin to target a passkey button |
|
inside a cross-origin iframe (e.g. --iframe-origin "https://auth.example.com"). |
|
|
|
Requirements: pip install --break-system-packages websockets |
|
Headless Chrome running with --remote-debugging-port=9222 |
|
|
|
Usage: |
|
python3 webauthn_passkey.py <url> [--button "Sign up"] [--cdp-port 9222] [--screenshot /tmp/shot.png] [--timeout 15] |
|
python3 webauthn_passkey.py <url> --iframe-origin "https://auth.example.com" --button "Sign up" |
|
""" |
|
|
|
import argparse |
|
import asyncio |
|
import base64 |
|
import json |
|
import sys |
|
import urllib.request |
|
|
|
|
|
async def run(args): |
|
try: |
|
import websockets |
|
except ImportError: |
|
print("Installing websockets...", file=sys.stderr) |
|
import subprocess |
|
|
|
subprocess.check_call( |
|
[sys.executable, "-m", "pip", "install", "--break-system-packages", "-q", "websockets"] |
|
) |
|
import websockets |
|
|
|
# --- 1. Discover the CDP target --- |
|
targets_url = f"http://127.0.0.1:{args.cdp_port}/json/list" |
|
with urllib.request.urlopen(targets_url) as resp: |
|
targets = json.loads(resp.read()) |
|
|
|
page_targets = [t for t in targets if t.get("type") == "page"] |
|
if not page_targets: |
|
print("ERROR: No page targets found. Is Chrome running?", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
ws_url = page_targets[0]["webSocketDebuggerUrl"] |
|
print(f"CDP target: {page_targets[0].get('url', 'unknown')}") |
|
print(f"WebSocket: {ws_url}") |
|
|
|
# --- helpers --- |
|
msg_id = 0 |
|
pending_events = [] |
|
|
|
async def send_cmd(ws, method, params=None): |
|
nonlocal msg_id |
|
msg_id += 1 |
|
payload = {"id": msg_id, "method": method} |
|
if params: |
|
payload["params"] = params |
|
await ws.send(json.dumps(payload)) |
|
while True: |
|
resp = await ws.recv() |
|
data = json.loads(resp) |
|
if "id" in data and data["id"] == msg_id: |
|
if "error" in data: |
|
print(f"CDP error [{method}]: {data['error']}", file=sys.stderr) |
|
return data |
|
# stash events for later inspection |
|
pending_events.append(data) |
|
|
|
async def drain(ws, timeout=2): |
|
events = list(pending_events) |
|
pending_events.clear() |
|
try: |
|
while True: |
|
resp = await asyncio.wait_for(ws.recv(), timeout=timeout) |
|
events.append(json.loads(resp)) |
|
except TimeoutError: |
|
pass |
|
return events |
|
|
|
async with websockets.connect(ws_url, max_size=50 * 1024 * 1024) as ws: |
|
# --- 2. Navigate to the target URL --- |
|
await send_cmd(ws, "Page.enable") |
|
await send_cmd(ws, "Runtime.enable") |
|
await send_cmd(ws, "Page.navigate", {"url": args.url}) |
|
print(f"Navigating to {args.url} ...") |
|
await asyncio.sleep(3) |
|
initial_events = await drain(ws, 2) |
|
|
|
# Verify navigation |
|
result = await send_cmd( |
|
ws, |
|
"Runtime.evaluate", |
|
{"expression": "window.location.href", "returnByValue": True}, |
|
) |
|
current_url = result.get("result", {}).get("result", {}).get("value", "") |
|
print(f"Page loaded: {current_url}") |
|
|
|
# --- 3. Enable WebAuthn + add virtual authenticator --- |
|
await send_cmd(ws, "WebAuthn.enable", {"enableUI": False}) |
|
result = await send_cmd( |
|
ws, |
|
"WebAuthn.addVirtualAuthenticator", |
|
{ |
|
"options": { |
|
"protocol": "ctap2", |
|
"transport": "internal", |
|
"hasResidentKey": True, |
|
"hasUserVerification": True, |
|
"isUserVerified": True, |
|
"automaticPresenceSimulation": True, |
|
} |
|
}, |
|
) |
|
auth_id = result.get("result", {}).get("authenticatorId", "") |
|
print(f"Virtual authenticator: {auth_id}") |
|
|
|
# --- 4. Resolve target execution context --- |
|
# If --iframe-origin is set, find the cross-origin iframe's context. |
|
# Otherwise use the default (top-level frame). |
|
target_context_id = None |
|
|
|
if args.iframe_origin: |
|
iframe_origin = args.iframe_origin.rstrip("/") |
|
print(f"Looking for cross-origin iframe from: {iframe_origin}") |
|
|
|
# Collect execution contexts we already received during page load |
|
context_events = [ |
|
e for e in initial_events |
|
if e.get("method") == "Runtime.executionContextCreated" |
|
] |
|
|
|
# Also drain a bit more in case iframe loaded late |
|
extra = await drain(ws, 3) |
|
context_events += [ |
|
e for e in extra |
|
if e.get("method") == "Runtime.executionContextCreated" |
|
] |
|
|
|
# Find the context whose origin matches the iframe |
|
for ev in context_events: |
|
ctx = ev.get("params", {}).get("context", {}) |
|
ctx_origin = ctx.get("origin", "") |
|
ctx_aux = ctx.get("auxData", {}) |
|
print(f" Context id={ctx.get('id')} origin={ctx_origin} frameId={ctx_aux.get('frameId', '')} isDefault={ctx_aux.get('isDefault', '')}") |
|
if ctx_origin.rstrip("/") == iframe_origin and ctx_aux.get("isDefault"): |
|
target_context_id = ctx.get("id") |
|
|
|
if target_context_id is None: |
|
# Fallback: use Page.getFrameTree to enumerate frames |
|
ft_result = await send_cmd(ws, "Page.getFrameTree") |
|
frame_tree = ft_result.get("result", {}).get("frameTree", {}) |
|
|
|
def find_frames(node, found=None): |
|
if found is None: |
|
found = [] |
|
frame = node.get("frame", {}) |
|
found.append(frame) |
|
for child in node.get("childFrames", []): |
|
find_frames(child, found) |
|
return found |
|
|
|
all_frames = find_frames(frame_tree) |
|
iframe_frame_id = None |
|
for f in all_frames: |
|
f_url = f.get("url", "") |
|
f_origin = f.get("securityOrigin", "") |
|
print(f" Frame id={f.get('id')} url={f_url} origin={f_origin}") |
|
if f_origin.rstrip("/") == iframe_origin or f_url.startswith(iframe_origin): |
|
iframe_frame_id = f.get("id") |
|
break |
|
|
|
if iframe_frame_id: |
|
# Create an isolated world in the iframe to get an execution context |
|
iw_result = await send_cmd(ws, "Page.createIsolatedWorld", { |
|
"frameId": iframe_frame_id, |
|
"grantUniveralAccess": True, |
|
}) |
|
target_context_id = iw_result.get("result", {}).get("executionContextId") |
|
print(f" Created isolated world in iframe: contextId={target_context_id}") |
|
|
|
if target_context_id is None: |
|
print(f"ERROR: Could not find iframe with origin {iframe_origin}", file=sys.stderr) |
|
print("Available frames/contexts are listed above.", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
print(f"Using execution context {target_context_id} for iframe {iframe_origin}") |
|
|
|
# Grant cross-origin permissions policy for credential creation |
|
# by injecting the Permissions-Policy override via CDP |
|
await send_cmd(ws, "Browser.grantPermissions", { |
|
"permissions": ["publicKeyCredentialCreate", "publicKeyCredentialGet"], |
|
"origin": iframe_origin, |
|
}) |
|
print(f"Granted WebAuthn permissions to {iframe_origin}") |
|
|
|
# --- 5. Click the target button --- |
|
await drain(ws, 1) |
|
|
|
button_text = args.button |
|
click_expr = f""" |
|
(() => {{ |
|
const buttons = document.querySelectorAll('button, [role="button"], a'); |
|
for (const btn of buttons) {{ |
|
if (btn.textContent.trim() === {json.dumps(button_text)}) {{ |
|
btn.click(); |
|
return 'clicked: ' + btn.textContent.trim(); |
|
}} |
|
}} |
|
const all = Array.from(buttons).map(b => b.textContent.trim()).filter(Boolean); |
|
return 'NOT FOUND. Available buttons: ' + all.join(', '); |
|
}})() |
|
""" |
|
eval_params = {"expression": click_expr, "returnByValue": True} |
|
if target_context_id is not None: |
|
eval_params["contextId"] = target_context_id |
|
|
|
result = await send_cmd(ws, "Runtime.evaluate", eval_params) |
|
click_result = result.get("result", {}).get("result", {}).get("value", "") |
|
print(f"Button: {click_result}") |
|
|
|
if "NOT FOUND" in click_result: |
|
# If targeting an iframe, try finding the button via DOM.describeNode |
|
# in each iframe as a last resort |
|
if args.iframe_origin: |
|
print("Button not found in iframe context. Try --button with exact button text.") |
|
sys.exit(1) |
|
|
|
# --- 6. Wait for passkey flow --- |
|
print(f"Waiting up to {args.timeout}s for passkey flow...") |
|
events = await drain(ws, args.timeout) |
|
for ev in events: |
|
method = ev.get("method", "") |
|
if method == "WebAuthn.credentialAdded": |
|
cred = ev.get("params", {}).get("credential", {}) |
|
print(f"✓ Credential added! rpId={cred.get('rpId', '')}") |
|
elif method == "WebAuthn.credentialAsserted": |
|
print("✓ Credential asserted!") |
|
elif method == "Runtime.consoleAPICalled": |
|
args_list = ev.get("params", {}).get("args", []) |
|
log_type = ev.get("params", {}).get("type", "log") |
|
if log_type == "error": |
|
msgs = [ |
|
str(a.get("value", a.get("description", "")))[:200] |
|
for a in args_list |
|
if a.get("value") or a.get("description") |
|
] |
|
if msgs: |
|
print(f" Console error: {' '.join(msgs)}") |
|
|
|
# Allow page to settle |
|
await asyncio.sleep(2) |
|
await drain(ws, 1) |
|
|
|
# --- 7. Verify final state --- |
|
verify_params = {"expression": "document.body.innerText", "returnByValue": True} |
|
if target_context_id is not None: |
|
verify_params["contextId"] = target_context_id |
|
result = await send_cmd(ws, "Runtime.evaluate", verify_params) |
|
page_text = result.get("result", {}).get("result", {}).get("value", "") |
|
print(f"\nPage state:\n{page_text[:600]}") |
|
|
|
# Check credentials on authenticator |
|
if auth_id: |
|
result = await send_cmd(ws, "WebAuthn.getCredentials", {"authenticatorId": auth_id}) |
|
creds = result.get("result", {}).get("credentials", []) |
|
print(f"\nCredentials registered: {len(creds)}") |
|
for c in creds: |
|
print(f" rpId={c.get('rpId', '')} resident={c.get('isResidentCredential', '')}") |
|
|
|
# --- 8. Screenshot --- |
|
result = await send_cmd(ws, "Page.captureScreenshot", {"format": "png"}) |
|
img_data = result.get("result", {}).get("data", "") |
|
if img_data: |
|
with open(args.screenshot, "wb") as f: |
|
f.write(base64.b64decode(img_data)) |
|
print(f"\n✓ Screenshot saved to {args.screenshot}") |
|
else: |
|
print("WARNING: Could not capture screenshot", file=sys.stderr) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="WebAuthn passkey flow via CDP") |
|
parser.add_argument("url", help="URL to navigate to") |
|
parser.add_argument( |
|
"--button", default="Sign up", help='Button text to click (default: "Sign up")' |
|
) |
|
parser.add_argument( |
|
"--iframe-origin", |
|
default=None, |
|
help='Origin of cross-origin iframe containing the passkey button (e.g. "https://auth.example.com")', |
|
) |
|
parser.add_argument("--cdp-port", type=int, default=9222, help="CDP port (default: 9222)") |
|
parser.add_argument( |
|
"--screenshot", default="/tmp/webauthn_screenshot.png", help="Screenshot output path" |
|
) |
|
parser.add_argument( |
|
"--timeout", |
|
type=int, |
|
default=15, |
|
help="Seconds to wait for passkey flow (default: 15)", |
|
) |
|
args = parser.parse_args() |
|
asyncio.run(run(args)) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |