Skip to content

Instantly share code, notes, and snippets.

@wware
Last active March 17, 2025 19:13
Show Gist options
  • Save wware/e836a806c39b2e7fd412bd074024d493 to your computer and use it in GitHub Desktop.
Save wware/e836a806c39b2e7fd412bd074024d493 to your computer and use it in GitHub Desktop.

Vibe Coding Meets Test-Driven Development

The Convergence of Approaches

Vibe coding - the practice of writing code that "feels right" and follows natural patterns - might seem at odds with Test-Driven Development (TDD)'s rigorous, test-first methodology. However, when combined thoughtfully, these approaches can create exceptionally robust and maintainable code.

A Concrete Example: WebAuthn Implementation

The WebAuthn implementation showcased in this repository demonstrates how these methodologies can work together effectively:

  1. The vibe-driven aspects include:

    • Intuitive API endpoint naming (/register/start, /login/complete)
    • Natural class hierarchies and inheritance patterns
    • Clear separation of concerns between components
    • Consistent error handling patterns
    • Readable, self-documenting code structure
  2. The TDD aspects provide:

    • Comprehensive test coverage
    • Clear specification of expected behaviors
    • Validation of edge cases
    • Performance guarantees
    • Security verification

Making It Work Together

Start with the Tests

While vibe coding encourages following intuitive patterns, starting with tests helps define:

  • Expected behaviors
  • Edge cases
  • Error conditions
  • Performance requirements
  • Security constraints

The WebAuthn test suite demonstrates this by covering everything from basic CRUD operations to complex authentication flows.

Use Tools Effectively

Modern development tools help maintain both code quality and testing rigor:

  1. Ruff

    • Fast Python linter
    • Catches common mistakes
    • Enforces consistent style
    • Identifies potential bugs
  2. Flake8

    • Style guide enforcement
    • Complexity checking
    • Dead code detection
    • Import order validation
  3. Pylint

    • Deep static analysis
    • Code smell detection
    • Maintainability metrics
    • Documentation checking
  4. Pytest

    • Powerful test framework
    • Fixture management
    • Parameterized testing
    • Coverage reporting

The Feedback Loop

The magic happens in the feedback loop between vibe coding and TDD:

  1. Write tests that capture the intended behavior
  2. Let the vibe guide the implementation
  3. Use tools to verify correctness
  4. Refactor based on both intuition and test results
  5. Repeat

Benefits of the Combined Approach

1. Confidence with Flexibility

  • Tests provide confidence that the code works
  • Vibe coding keeps the implementation clean and intuitive
  • Tools ensure consistency and catch mistakes

2. Maintainable Security

The WebAuthn example shows how security-critical code can be both:

  • Thoroughly tested for correctness
  • Written in a clear, maintainable style

3. Natural Documentation

  • Tests serve as specifications
  • Code structure reflects natural patterns
  • Tools enforce documentation standards

4. Sustainable Development

  • Tests catch regressions
  • Code remains readable
  • Tools automate quality checks
  • Development stays efficient long-term

Best Practices

  1. Write Tests First

    • Define expected behavior
    • Consider edge cases
    • Plan error handling
    • Set performance expectations
  2. Follow the Vibe

    • Use intuitive names
    • Create clear structures
    • Follow natural patterns
    • Keep it readable
  3. Trust but Verify

    • Run comprehensive tests
    • Use multiple tools
    • Check coverage
    • Validate security
  4. Refactor Fearlessly

    • Tests provide safety net
    • Tools catch mistakes
    • Vibe guides improvements
    • Maintain clarity

Conclusion

The synthesis of vibe coding and TDD, supported by modern development tools, creates a powerful methodology for developing correct, maintainable code. The WebAuthn implementation demonstrates how this approach can successfully handle even complex, security-critical systems.

By starting with comprehensive tests, following natural coding patterns, and using tools to verify correctness, we can create code that is both robust and enjoyable to work with. This combined approach provides the best of both worlds: the confidence of thorough testing with the clarity and maintainability of well-structured code.

WebAuthn Implementation Test Suite

Overview

This test suite verifies a WebAuthn (Web Authentication) implementation built with FastAPI. The tests cover the complete authentication flow, database operations, token management, and HTTPS functionality. The implementation allows users to register and authenticate using platform authenticators (like Windows Hello) or cross-platform authenticators (like security keys).

Test Structure

The test suite is divided into four main areas:

  1. Registration and Authentication (test_registration.py)
  2. API Token Management (test_api_token.py)
  3. Database Operations (test_database.py)
  4. HTTPS Server Functionality (test_https.py)

Core Functionality Tests

WebAuthn Registration Flow

The registration tests verify that:

  • New users can successfully register with the system
  • Duplicate registrations are properly rejected
  • Registration challenges are correctly generated and verified
  • Credential data is properly stored
  • The registration process follows WebAuthn specifications

Example test: test_registration_flow() simulates a complete registration process, including mock authenticator responses and credential verification.

WebAuthn Authentication Flow

Authentication tests ensure:

  • Registered users can successfully log in
  • Invalid credentials are rejected
  • Authentication challenges are properly handled
  • Non-existent users cannot authenticate
  • The authentication process adheres to WebAuthn security requirements

Example test: test_authentication_flow() verifies the complete login process, including challenge verification and token generation.

Security Features

Token Management

The token-related tests (test_api_token.py) verify:

  • JWT tokens are correctly generated upon successful authentication
  • Tokens contain proper claims and expiration times
  • Protected endpoints properly validate tokens
  • Expired tokens are rejected
  • Token revocation works as expected

Key test: test_token_expiration() ensures that expired tokens cannot access protected resources.

Security Validations

Security-specific tests verify:

  • Challenge-response mechanisms work correctly
  • Invalid challenges are rejected
  • Revoked tokens cannot be reused
  • User verification requirements are enforced
  • Origin validation is properly implemented

Database Operations

CRUD Operations

Database tests (test_database.py) verify:

  • User creation and retrieval work correctly
  • Challenge storage and retrieval function properly
  • Credential management is reliable
  • Database cleanup operations work as expected

Performance

Performance tests ensure:

  • Database operations complete within acceptable time limits
  • The system can handle multiple concurrent operations
  • Resource cleanup is efficient

Example: test_database_performance() verifies that 100 user creations complete within 1 second.

HTTPS Implementation

Server Configuration

HTTPS tests (test_https.py) verify:

  • The server successfully serves content over HTTPS
  • SSL certificates are properly configured
  • Secure connections are established correctly
  • Certificate validation works as expected

Connection Security

Tests ensure:

  • Only HTTPS connections are accepted
  • SSL/TLS handshakes complete successfully
  • Certificate chain validation works correctly
  • Proper security headers are present

Test Infrastructure

Fixtures

The test suite uses several key fixtures:

  • database: Provides an isolated test database
  • https_server: Manages the HTTPS test server
  • authenticated_client: Provides a pre-authenticated test client
  • mock_credential_factory: Generates test WebAuthn credentials

Mocking

The tests use strategic mocking to:

  • Simulate authenticator responses
  • Control challenge generation
  • Manage token creation
  • Handle SSL certificate validation

Correctness Guarantees

The test suite provides confidence in the implementation by:

  1. Complete Coverage: Testing all aspects of the WebAuthn flow from registration through authentication

  2. Security Validation: Verifying that security measures are properly implemented and cannot be bypassed

  3. Edge Cases: Testing boundary conditions and error scenarios to ensure robust handling

  4. Standards Compliance: Verifying adherence to WebAuthn specifications and security best practices

  5. Integration Testing: Ensuring all components work together correctly in realistic scenarios

Conclusion

This test suite provides strong assurance that the WebAuthn implementation:

  • Correctly implements the WebAuthn specification
  • Properly handles user authentication
  • Maintains security throughout the process
  • Manages tokens and sessions correctly
  • Performs efficiently at scale
  • Provides proper HTTPS security

The comprehensive nature of these tests, combined with their focus on security and edge cases, provides confidence in the robustness and correctness of the implementation.

-----BEGIN CERTIFICATE-----
MIIDnTCCAoUCFCYNkeLTrGPOPr1k8KOGCKPWON7oMA0GCSqGSIb3DQEBCwUAMIGK
MQswCQYDVQQGEwJVUzELMAkGA1UECAwCTUExEzARBgNVBAcMCkZyYW1pbmdoYW0x
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJV2ls
bCBXYXJlMSIwIAYJKoZIhvcNAQkBFhN3aWxsLndhcmVAZ21haWwuY29tMB4XDTI1
MDMxNDE0NDEwMloXDTI2MDMxNDE0NDEwMlowgYoxCzAJBgNVBAYTAlVTMQswCQYD
VQQIDAJNQTETMBEGA1UEBwwKRnJhbWluZ2hhbTEhMB8GA1UECgwYSW50ZXJuZXQg
V2lkZ2l0cyBQdHkgTHRkMRIwEAYDVQQDDAlXaWxsIFdhcmUxIjAgBgkqhkiG9w0B
CQEWE3dpbGwud2FyZUBnbWFpbC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQCeWgMjNxfp6Id+I6Ryr2lVqEke2ovTgKODBRQD1s33eXpVEWze3XIj
yLXgLXQd9DXiMvSt/IexYkqz4WDrwsc6RC+2sd/ow4dOUC88oNHoBX6MWrY3b3h6
EmCE8x4kOMAXs/Wv110Mlu80YbSebfvxQJRvM0xGLgisdIrhdgzjHp8qVxqcVbJ0
Ecytr9X9YaJ1tIG0uR3BfY9hpCSz0ql/tgTz7CHyvMaRLRSkbErmcbCUhkbIC7r+
HtIilRzKSkLvxtgkLXqQ7iw7ZTpaq0ZCgGAANk/9MR6BmR8M+y57CX6Gdl3AJIOy
uW8sTiQdBhT3VAYAWGRjS1Y0SIzk+oTlAgMBAAEwDQYJKoZIhvcNAQELBQADggEB
ADhNdSoBFKrsjZm48Cy8LPF7E6jSCoxzpPRQBUgAW0J6n+hGweaa4VXLTvaRpLN2
B39oSrwShJkNRRtAhYxOEylN78E26Nv8lCn7LpAz+jVa0FVwk8rkmnMtNzsQ1W1+
1Zl2aQNyBiPJsQnPkUVM/mJGhEeZ2jPlsjhvJmZGInfguOaZScuVdxMmQOXXsMsp
16IeALIlNhGZZOWJ1OvNMULCNWhIrorZ666SgVjyERNxC39K5G+QVUj8udZsr+Cb
vHWhFjDNCCS0+o/voPtcELXGOYWzrnejYKn+G7hjewZJ8pZIJjAybyOtk7uk16PG
xwHrCaV2sNDy+0JEmq40XrM=
-----END CERTIFICATE-----
#!/bin/bash
# Generate a private key
openssl genrsa -out key.pem 2048
# Generate a certificate signing request (CSR)
openssl req -new -key key.pem -out csr.pem
# Generate a self-signed certificate (valid for 365 days)
openssl x509 -req -days 365 -in csr.pem -signkey key.pem -out cert.pem
# Clean up the CSR file
rm csr.pem
echo "Self-signed certificate generated:"
echo "Private key: key.pem"
echo "Certificate: cert.pem"
"""
Common interfaces and base classes used across the application.
"""
from contextlib import contextmanager
import logging
import os
import threading
import time
import uvicorn
from abc import ABC, abstractmethod
from typing import Optional, Dict
class DatabaseInterface(ABC):
@abstractmethod
def add_user(self, username: str, password_hash: str) -> bool:
"""
Add a new user to the database.
Returns True if successful, False if user already exists.
"""
pass
@abstractmethod
def get_user(self, username: str) -> Optional[Dict]:
"""
Retrieve user information.
Returns None if user doesn't exist.
"""
pass
@abstractmethod
def verify_credentials(self, username: str, password_hash: str) -> bool:
"""
Check if the credentials are valid.
"""
pass
@abstractmethod
def add_challenge(self, username: str, challenge: str) -> bool:
"""
Store a challenge for a user.
Returns True if successful.
"""
pass
@abstractmethod
def get_challenge(self, username: str) -> Optional[str]:
"""
Retrieve the current challenge for a user.
Returns None if no challenge exists.
"""
pass
# Test cleanup methods
@abstractmethod
def clear_users(self) -> None:
"""Remove all users (primarily for testing)."""
pass
@abstractmethod
def clear_credentials(self) -> None:
"""Remove all credentials (primarily for testing)."""
pass
@abstractmethod
def clear_challenges(self) -> None:
"""Remove all challenges (primarily for testing)."""
pass
@contextmanager
def https_server(app, host="127.0.0.1", port=8000):
"""
Context manager for running an HTTPS server.
Args:
app: FastAPI application instance
host: Server host (default: 127.0.0.1)
port: Server port (default: 8000)
Yields:
uvicorn.Server: The running server instance
"""
logger = logging.getLogger(__name__)
logger.info("Starting HTTPS server...")
# Check if SSL certificates exist
if not os.path.exists("key.pem") or not os.path.exists("cert.pem"):
raise FileNotFoundError(
"SSL certificates not found. Please ensure key.pem and cert.pem exist."
)
# Create and configure the server
config = uvicorn.Config(
app=app,
host=host,
port=port,
ssl_keyfile="key.pem",
ssl_certfile="cert.pem",
log_level="error"
)
server = uvicorn.Server(config)
# Override server install_signal_handlers to do nothing
server.install_signal_handlers = lambda: None
# Start the server in a thread
thread = threading.Thread(target=server.run)
thread.daemon = True
thread.start()
# Give the server a moment to start
time.sleep(1)
logger.info("Server started")
try:
yield server
finally:
# Cleanup
logger.info("Shutting down server...")
server.should_exit = True
thread.join(timeout=1.0)
logger.info("Server stopped")
"""
Pytest configuration and fixtures.
This module provides shared fixtures and configuration for the test suite:
- Logging configuration for test execution
- Automatic server state reset between tests
- In-memory database cleanup
"""
# conftest.py
import logging
import threading
from typing import Optional, Dict
import time
import os
import pytest
import uvicorn
from main import app
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('tests')
@pytest.fixture(scope="session")
def database():
"""Provide a database interface for the entire test session."""
from main import SQLiteDatabase
return SQLiteDatabase()
@pytest.fixture(scope="module")
def https_server(database):
"""Start a test HTTPS server for the duration of the tests"""
logger.info("Starting test HTTPS server...")
# Check if SSL certificates exist
if not os.path.exists("key.pem") or not os.path.exists("cert.pem"):
pytest.skip("SSL certificates not found. Please ensure key.pem and cert.pem exist.")
# Clear state before test
database.clear_users()
database.clear_credentials()
database.clear_challenges()
from main import users_db, credentials_db, challenge_db, revoked_tokens
users_db.clear()
credentials_db.clear()
challenge_db.clear()
revoked_tokens.clear()
# Create and configure the server
config = uvicorn.Config(
app=app,
host="127.0.0.1",
port=8000,
ssl_keyfile="key.pem",
ssl_certfile="cert.pem",
log_level="error"
)
server = uvicorn.Server(config)
# Store server instance for cleanup
app._test_server = server
# Override server install_signal_handlers to do nothing
server.install_signal_handlers = lambda: None
# Start the server in a thread
thread = threading.Thread(target=server.run)
thread.daemon = True
thread.start()
# Give the server a moment to start
time.sleep(1)
logger.info("Test server started")
yield
server.should_exit = True
thread.join()
logger.info("Server stopped")
try:
# Reset database and memory state
database.clear_users()
database.clear_credentials()
database.clear_challenges()
users_db.clear()
credentials_db.clear()
challenge_db.clear()
revoked_tokens.clear()
except Exception as e:
logger.error(f"Error during state reset: {e}")
finally:
logger.debug("Server state reset completed")
@pytest.fixture
def short_lived_token():
"""Create a token that expires in 1 second for testing"""
from datetime import timedelta
from main import create_access_token
def _create_token(username="test_user"):
test_data = {"sub": username}
return create_access_token(
test_data,
expires_delta=timedelta(seconds=1) # Token expires in 1 second
)
return _create_token
for x in *.py; do printf "####### $x ######\n\n"; cat $x; printf "\n\n\n\n\n"; done > ../p.txt
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCeWgMjNxfp6Id+
I6Ryr2lVqEke2ovTgKODBRQD1s33eXpVEWze3XIjyLXgLXQd9DXiMvSt/IexYkqz
4WDrwsc6RC+2sd/ow4dOUC88oNHoBX6MWrY3b3h6EmCE8x4kOMAXs/Wv110Mlu80
YbSebfvxQJRvM0xGLgisdIrhdgzjHp8qVxqcVbJ0Ecytr9X9YaJ1tIG0uR3BfY9h
pCSz0ql/tgTz7CHyvMaRLRSkbErmcbCUhkbIC7r+HtIilRzKSkLvxtgkLXqQ7iw7
ZTpaq0ZCgGAANk/9MR6BmR8M+y57CX6Gdl3AJIOyuW8sTiQdBhT3VAYAWGRjS1Y0
SIzk+oTlAgMBAAECggEAJb8qss0J/gfgLn0kEPOH7jOClKB/htAKCiweeIjOibCz
LM6Nz3VlpSqN0k27eX1yj8RaR5QkK3BVAtqVfjWsxJKm9guiJovfd9dM30ytpLuU
jjP8biv8ActaJy8F7xf5EF1lUkCJ7XuYMzocMkzg6wIcMALAKpWEG0nfpj179nKA
ife8790kvBgftHX73UeXoSeU0WbFRKYGkBkV+U5hVF+EJcq9mj7sLUst0U5n95xH
VaXzQnrlRfkLVw6ul5JfpMvUKc/dIX3lflfp3gnCKKYIwEiKfGQ/NWcy6OJkyoZO
PQW1mJK0W90V2T40Gv8Ej8wrOUrpSX7YcYFFcULKOQKBgQDdv+1Hqe3XiPXC3i9Z
mKM9o/DNTpx5Yn4f1TgoHhzZkBEP5K3Kn3RBy1qD69bo5KPzKQC21NkKXspA5LO4
DN2i30Aa1/r1Ymo2oGoZJy7RpOXnoDdAADJ6OzGSRbPd37Qk7cza7tWiuTb93VoC
VQiT9jkGxxCmfSqKPllEF9BFCQKBgQC2z0x9LBP4FUmfKc+e/GvjKY3Cbzzovya2
LZWapGUutahl7nN7PdMBZk9RHIoz10BH8p5k38XdVQGrTre0+WJt2BtS9wasiYP2
tzzSPeCiLk8H3qZePadNU+3lkcQuF943lYZhMwrytltkrlhNqXnRZsH5kRqBJnLU
mVlxjhmz/QKBgQCsNDXCJ1aKwNOJpx3CSGTMo5mG9ntRUIl9kwlvLiNdWSHG4+GK
rtWLmq4KMIXOinA/jyH82cJK6V3Lk091qJgduZ2AuyUqlmjBT0XVjrYvHA1mGOeP
DhglD/7pBrQto2G0I++oPT0uhwMLlnAly39D2Dxk5QYrJJx6ivyrgoq8cQKBgH0Y
EXE+LlP+zSg7BjX26STDFJxOXlGuLCFRxCPUBuEVsaw+pYtZ3QACMJiCfRfIXdZd
8YF5lv5D4yaEcTVHydD8poX5P/nlCAZVkSsU6JhSNAZgl1u5uLgzjIEhZCtebdN/
YDs3IHDB1HxqyUYriI6AUJAkeQyTcNSGMyx/XptJAoGAKvyDKMc6WkmWTHwrY2tS
BAqe9ebiy/Lf7/V5ylfFrfsBZ9ZT/RQoL3ji0yzaVbdcMkTCFt0GFzsiOriBuUQc
o8C4PhPhxF6lM1lqf7vNDJ4TJ7K7BAdBGvH5OFvpUG5hr7wsZbrgp9oHiP70ezy+
SerixMQtkwH0oYCU4pXm96g=
-----END PRIVATE KEY-----
"""
WebAuthn Implementation with FastAPI
Implement WebAuthn (Web Authentication) in a FastAPI application.
Provide endpoints for registration and authentication.
"""
import os
import base64
import logging
import secrets
import uuid
from datetime import datetime, timedelta, UTC
import hashlib
import threading
import sqlite3
import time
from sqlite3 import Connection
from typing import Dict, Optional
from pydantic import BaseModel
import jwt
from jwt.exceptions import PyJWTError
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordBearer
import webauthn
from webauthn.helpers.structs import (
PublicKeyCredentialCreationOptions,
PublicKeyCredentialRequestOptions,
PublicKeyCredentialUserEntity,
PublicKeyCredentialRpEntity,
RegistrationCredential,
AuthenticationCredential,
)
from common import DatabaseInterface, https_server
logger = logging.getLogger('main')
class Server:
"""
Class representing the server
"""
def __init__(self, db: DatabaseInterface):
self.database = db
self.revoked_tokens = set()
def register_user(self, username: str, password: str = None) -> bool:
"""
Register a new user.
For WebAuthn, password is None.
For password auth, password is required.
Returns True if registration successful, False if username taken.
"""
if password is None:
# WebAuthn registration, empty password
return self.database.add_user(username, "")
password_hash = self._hash_password(password)
return self.database.add_user(username, password_hash)
def authenticate_user(self, username: str, password: str) -> bool:
"""
Verify user credentials.
"""
password_hash = self._hash_password(password)
return self.database.verify_credentials(username, password_hash)
def create_challenge(self, username: str) -> Optional[str]:
"""
Create and store a new challenge for a user.
Returns None if user doesn't exist.
"""
if not self.database.get_user(username):
return None
challenge = secrets.token_hex(32)
if self.database.add_challenge(username, challenge):
return challenge
return None
def verify_challenge(self, username: str, challenge: str) -> bool:
"""
Verify that a challenge matches what's stored for the user.
"""
stored_challenge = self.database.get_challenge(username)
return stored_challenge is not None and stored_challenge == challenge
@staticmethod
def _hash_password(password: str) -> str:
"""Hash a password using SHA-256."""
return hashlib.sha256(password.encode()).hexdigest()
class SQLiteDatabase(DatabaseInterface):
"""
Wrapper to make SQLite follow the DatabaseInstance interface
"""
def __init__(self):
self._local = threading.local()
@property
def conn(self) -> Connection:
"""
Get a db connection. If necessary, create one, and
populate in-memory database. This is for development only.
"""
if not hasattr(self._local, "conn"):
self._local.conn = sqlite3.connect(':memory:')
self._init_tables(self._local.conn)
return self._local.conn
def _init_tables(self, conn: Connection):
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
password_hash TEXT NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS challenges (
username TEXT PRIMARY KEY,
challenge TEXT NOT NULL,
FOREIGN KEY (username) REFERENCES users(username)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS credentials (
user_id TEXT PRIMARY KEY,
token TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(username)
)
""")
conn.commit()
def add_user(self, username: str, password_hash: str) -> bool:
cursor = self.conn.cursor()
try:
cursor.execute(
"INSERT INTO users (username, password_hash) VALUES (?, ?)",
(username, password_hash)
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
except sqlite3.Error: # Catch other potential SQLite errors
logger.error("Database error while adding user")
return False
def get_user(self, username: str) -> Optional[Dict]:
cursor = self.conn.cursor()
cursor.execute(
"SELECT username, password_hash FROM users WHERE username = ?",
(username,)
)
row = cursor.fetchone()
if row:
return {"username": row[0], "password_hash": row[1]}
return None
def verify_credentials(self, username: str, password_hash: str) -> bool:
cursor = self.conn.cursor()
cursor.execute(
"SELECT 1 FROM users WHERE username = ? AND password_hash = ?",
(username, password_hash)
)
return cursor.fetchone() is not None
def add_challenge(self, username: str, challenge: str) -> bool:
cursor = self.conn.cursor()
try:
cursor.execute(
"""INSERT OR REPLACE INTO challenges
(username, challenge) VALUES (?, ?)""",
(username, challenge)
)
self.conn.commit()
return True
except sqlite3.Error:
return False
def get_challenge(self, username: str) -> Optional[str]:
cursor = self.conn.cursor()
cursor.execute(
"SELECT challenge FROM challenges WHERE username = ?",
(username,)
)
row = cursor.fetchone()
return row[0] if row else None
def clear_users(self):
cursor = self.conn.cursor()
cursor.execute("DELETE FROM users")
self.conn.commit()
def clear_credentials(self):
cursor = self.conn.cursor()
cursor.execute("DELETE FROM credentials")
self.conn.commit()
def clear_challenges(self):
cursor = self.conn.cursor()
cursor.execute("DELETE FROM challenges")
self.conn.commit()
database = SQLiteDatabase()
server = Server(database)
# Initialize FastAPI
app = FastAPI(title="FastAPI WebAuthn Example")
# This would be your database in a real application
users_db = {}
credentials_db = {}
# Store challenges temporarily (in a real app, use Redis or similar)
challenge_db = {}
# Store revoked tokens
revoked_tokens = set()
# Configuration for your application
RELYING_PARTY_ID = "127.0.0.1" # Your domain name
RELYING_PARTY_NAME = "FastAPI WebAuthn Example"
RELYING_PARTY_ORIGIN = "https://127.0.0.1:8000/"
# Add these configuration variables after other config variables
SECRET_KEY = os.environ.get("SECRET_KEY") or secrets.token_urlsafe(32)
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Pydantic models for request validation
class RegisterStartRequest(BaseModel):
"""
Request model for initiating WebAuthn registration.
Attributes:
username: The username to register with the system
"""
username: str
class RegisterCompleteRequest(BaseModel):
"""
Request model for completing WebAuthn registration.
Attributes:
username: The username being registered
credential: The WebAuthn credential data from the authenticator
"""
username: str
credential: dict
class LoginStartRequest(BaseModel):
"""
Request model for initiating WebAuthn authentication.
Attributes:
username: The username attempting to log in
"""
username: str
class LoginCompleteRequest(BaseModel):
"""
Request model for completing WebAuthn authentication.
Attributes:
username: The username being authenticated
credential: The WebAuthn assertion data from the authenticator
"""
username: str
credential: dict
# Utility functions
def generate_challenge() -> str:
"""Generate a random challenge for WebAuthn operations"""
random_bytes = secrets.token_bytes(32)
return base64.urlsafe_b64encode(random_bytes).decode('utf-8').rstrip('=')
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create a JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(UTC) + expires_delta
else:
expire = (datetime.now(UTC) +
timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Add this near the top with other app initialization
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""Verify JWT token and return current user"""
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Check if token is revoked
if token in revoked_tokens:
raise HTTPException(
status_code=401,
detail="Token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
if username not in users_db:
raise credentials_exception
return username
except PyJWTError as exc:
raise credentials_exception from exc
# API Endpoints
@app.post("/register/start")
async def register_start(request: RegisterStartRequest):
"""Start the registration process"""
logger.info("Start the registration process")
# Check if user already exists in users_db
if request.username in users_db:
raise HTTPException(status_code=400, detail="User already exists")
# Now this will work with just a username
if not server.register_user(request.username):
raise HTTPException(status_code=400, detail="User already exists")
# Generate a new user ID
user_id = str(uuid.uuid4())
# Store the user (in a real app, you'd save to a database)
users_db[request.username] = {
"id": user_id,
"username": request.username,
"credentials": []
}
# Generate a challenge
challenge = generate_challenge()
challenge_db[request.username] = challenge
# Create WebAuthn registration options
options = PublicKeyCredentialCreationOptions(
rp=PublicKeyCredentialRpEntity(id=RELYING_PARTY_ID,
name=RELYING_PARTY_NAME),
user=PublicKeyCredentialUserEntity(
id=str(user_id).encode('utf-8'), # Use the byte representation
name=request.username,
display_name=request.username,
),
challenge=challenge,
pub_key_cred_params=[
{"type": "public-key", "alg": -7}, # ES256
{"type": "public-key", "alg": -257} # RS256
],
timeout=60000,
attestation="direct",
authenticator_selection={
"authenticator_attachment": "platform",
# or "cross-platform" for security keys
"require_resident_key": False,
"user_verification": "preferred",
},
exclude_credentials=[], # No credentials to exclude for a new user
)
options_dict = {
key: getattr(options, key)
for key in dir(options)
if not key.startswith("_") and key not in ("rp", "user")
}
return JSONResponse(content=options_dict)
@app.post("/register/complete")
async def register_complete(request: RegisterCompleteRequest):
"""Complete the registration process"""
logger.info("Complete the registration process")
username = request.username
logger.debug("Check if user exists")
if username not in users_db:
raise HTTPException(status_code=400, detail="User does not exist")
logger.debug("Get the challenge")
challenge = challenge_db.get(username)
if not challenge:
raise HTTPException(status_code=400, detail="No challenge found")
logger.debug("Parse the credential")
try:
d = dict(request.credential)
d['raw_id'] = d.pop('rawId')
credential = RegistrationCredential(**d)
logger.debug("Verify the registration")
registration_verification = webauthn.verify_registration_response(
credential=credential,
expected_challenge=challenge,
expected_origin=RELYING_PARTY_ORIGIN,
expected_rp_id=RELYING_PARTY_ID,
require_user_verification=False,
)
logger.debug("Store the credential")
credential_id = registration_verification.credential_id
public_key = registration_verification.credential_public_key
logger.debug("In a real app, store these in a secure database")
credentials_db[credential_id] = {
"username": username,
"public_key": public_key,
"sign_count": registration_verification.sign_count,
}
logger.debug("Associate credential with user")
users_db[username]["credentials"].append(credential_id)
logger.debug("Clean up the challenge")
del challenge_db[username]
logger.debug("Registration successful")
return {"status": "success", "message": "Registration successful"}
except Exception as e:
logger.error(e)
raise HTTPException(
status_code=400,
detail=f'Registration failed: {str(e)}'
) from e
@app.post("/login/start")
async def login_start(request: LoginStartRequest):
"""Start the login process"""
logger.info("Start the login process")
username = request.username
logger.debug("Check if user exists")
if username not in users_db:
logger.debug("User does not exist")
raise HTTPException(status_code=400, detail="User does not exist")
logger.debug("Get user's credentials")
user_credential_ids = users_db[username]["credentials"]
if not user_credential_ids:
raise HTTPException(status_code=400,
detail="No credentials found for user")
logger.debug("Create a list of allowed credentials")
allowed_credentials = []
for cred_id in user_credential_ids:
allowed_credentials.append({
"type": "public-key",
"id": cred_id,
})
logger.debug("Generate a challenge")
challenge = generate_challenge()
challenge_db[username] = challenge
logger.debug("Create WebAuthn authentication options")
options = PublicKeyCredentialRequestOptions(
challenge=challenge,
timeout=60000,
rp_id=RELYING_PARTY_ID,
allow_credentials=allowed_credentials,
user_verification="preferred",
)
logger.debug("Return stuff")
options_dict = {
key: getattr(options, key)
for key in dir(options)
if not key.startswith("_") and key not in ("rp", "user")
}
return JSONResponse(content=options_dict)
@app.post("/login/complete")
async def login_complete(request: LoginCompleteRequest):
"""Complete the login process"""
logger.info("Complete the login process")
username = request.username
logger.debug("Check if user exists")
if username not in users_db:
raise HTTPException(status_code=400, detail="User does not exist")
logger.debug("Get the challenge")
challenge = challenge_db.get(username)
if not challenge:
raise HTTPException(status_code=400, detail="No challenge found")
logger.debug("Parse the credential")
try:
d = dict(request.credential)
d['raw_id'] = d.pop('rawId')
credential = AuthenticationCredential(**d)
logger.debug("Get credential data")
credential_id = credential.id
cred_data = credentials_db.get(credential_id)
if not cred_data or cred_data["username"] != username:
raise HTTPException(status_code=400, detail="Invalid credential")
logger.debug("Verify the authentication")
auth_verification = webauthn.verify_authentication_response(
credential=credential,
expected_challenge=challenge,
expected_origin=RELYING_PARTY_ORIGIN,
expected_rp_id=RELYING_PARTY_ID,
credential_public_key=cred_data["public_key"],
credential_current_sign_count=cred_data["sign_count"],
require_user_verification=False,
)
logger.debug("Update the sign count")
credentials_db[credential_id]["sign_count"] = \
auth_verification.new_sign_count
logger.debug("Clean up the challenge")
del challenge_db[username]
# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, # 'sub' is standard JWT claim for subject
expires_delta=access_token_expires
)
logger.debug("Login successful")
return {
"status": "success",
"message": "Login successful",
"access_token": access_token,
"token_type": "bearer"
}
except Exception as e:
raise HTTPException(
status_code=400,
detail=f'Authentication failed: {str(e)}'
) from e
@app.get("/protected")
async def protected_route(current_user: str = Depends(get_current_user)):
"""
Protected endpoint that requires a valid JWT token for access.
Args:
current_user: The authenticated username, extracted from the JWT token
Returns:
dict: Response containing a greeting and protected data including:
- message: Personalized greeting
- data: Dictionary containing secret value and timestamp
"""
return {
"message": f"Hello {current_user}",
"data": {
"secret_value": "This data is only accessible with a valid token",
"timestamp": datetime.now(UTC).isoformat()
}
}
@app.post("/token/revoke")
async def revoke_token(token: str = Depends(oauth2_scheme)):
"""Revoke a JWT token"""
revoked_tokens.add(token)
return {"status": "success", "message": "Token revoked successfully"}
# Mount the MkDocs static site
os.system("mkdocs build")
app.mount("/", StaticFiles(directory="site", html=True), name="site")
if __name__ == "__main__":
with https_server(app, host="0.0.0.0", port=8000) as server:
# Server is running here
try:
# Keep the main thread alive
while not server.should_exit:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Received shutdown signal")
site_name: Auth hacking
site_url: http://localhost:8000/
theme:
name: material
features:
- navigation.tabs
- navigation.indexes
- content.tabs.link
- content.code.annotate
palette:
- media: "(prefers-color-scheme: dark)"
scheme: slate
primary: deep purple
accent: lime
- media: "(prefers-color-scheme: light)"
scheme: default
primary: indigo
accent: pink
plugins:
- search
extra_javascript:
- javascripts/extra.js
- https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.4/MathJax.js?config=TeX-AMS-MML_HTMLorMML
markdown_extensions:
- pymdownx.superfences:
custom_fences:
- name: mermaid
class: mermaid
format: !!python/name:pymdownx.superfences.fence_code_format
- pymdownx.snippets
- mdx_math
nav:
- Home: index.md
- Auth: auth.md
- Tests: auth_tests.md
annotated-types==0.7.0
anyio==4.8.0
asn1crypto==1.5.1
babel==2.17.0
backrefs==5.8
cbor2==5.6.5
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
colorama==0.4.6
cryptography==44.0.2
dnspython==2.7.0
ecdsa==0.19.1
email_validator==2.2.0
fastapi==0.115.11
fastapi-cli==0.0.7
ghp-import==2.1.0
h11==0.14.0
httpcore==1.0.7
httptools==0.6.4
httpx==0.28.1
idna==3.10
iniconfig==2.0.0
itsdangerous==2.2.0
Jinja2==3.1.6
jwt==1.3.1
Markdown==3.7
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mdurl==0.1.2
mergedeep==1.3.4
mkdocs==1.6.1
mkdocs-get-deps==0.2.0
mkdocs-material==9.6.8
mkdocs-material-extensions==1.3.1
Naked==0.1.32
orjson==3.10.15
packaging==24.2
paginate==0.5.7
pathspec==0.12.1
platformdirs==4.3.6
pluggy==1.5.0
pyasn1==0.4.8
pycparser==2.22
pydantic==2.10.6
pydantic-extra-types==2.10.3
pydantic-settings==2.8.1
pydantic_core==2.27.2
Pygments==2.19.1
pymdown-extensions==10.14.3
pyOpenSSL==25.0.0
pytest==8.3.5
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-jose==3.4.0
python-markdown-math==0.8
python-multipart==0.0.20
PyYAML==6.0.2
pyyaml_env_tag==0.1
requests==2.32.3
rich==13.9.4
rich-toolkit==0.13.2
rsa==4.9
shellescape==3.8.1
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
starlette==0.46.1
typer==0.15.2
typing_extensions==4.12.2
ujson==5.10.0
urllib3==2.3.0
uvicorn==0.34.0
uvloop==0.21.0
watchdog==6.0.0
watchfiles==1.0.4
webauthn==2.5.2
websockets==15.0.1
"""
Test suite for WebAuthn API token functionality.
This module tests the JWT token lifecycle including:
- Token creation
- Protected endpoint access validation
- Token expiration handling
- Token revocation mechanism
These tests verify that tokens are properly created upon successful WebAuthn
authentication and that they can be securely used for API access.
"""
import time
import uuid
import base64
import json
import logging
from datetime import datetime, timedelta, UTC
from unittest.mock import patch, MagicMock
import pytest
import jwt
from fastapi.testclient import TestClient
from main import create_access_token, app, SECRET_KEY, ALGORITHM
from main import users_db, credentials_db, challenge_db, generate_challenge
logger = logging.getLogger()
@pytest.fixture
def client():
"""Create a test client for the FastAPI app"""
tc = TestClient(
app,
base_url="https://127.0.0.1:8000/"
)
return tc
@pytest.fixture
def authenticated_client(client):
"""Create an authenticated client with a valid token"""
logger.debug("Setting up authenticated client fixture")
username = "test_user"
if username not in users_db:
logger.debug(f"Creating test user: {username}")
# Add a test user
user_id = str(uuid.uuid4())
users_db[username] = {
"id": user_id,
"username": username,
"credentials": ["test_credential_id"]
}
# Add a test credential
credentials_db["test_credential_id"] = {
"username": username,
"public_key": b"mock_public_key",
"sign_count": 0
}
else:
logger.debug(f"Using existing test user: {username}")
# Always add a challenge (even if the user already exists)
challenge = generate_challenge()
challenge_db[username] = challenge
# Mock the authenticator response
mock_credential = {
"id": "test_credential_id",
"rawId":
base64.urlsafe_b64encode(b"test_credential_id").decode('ascii'),
"type": "public-key",
"response": {
"clientDataJSON": base64.urlsafe_b64encode(json.dumps({
"type": "webauthn.get",
"challenge": challenge_db[username],
"origin": "https://127.0.0.1:8000/"
}).encode()).decode('ascii'),
"authenticatorData":
base64.urlsafe_b64encode(b"mock_auth_data").decode('ascii'),
"signature":
base64.urlsafe_b64encode(b"mock_signature").decode('ascii'),
"userHandle":
base64.urlsafe_b64encode(b"user_handle").decode('ascii')
}
}
with patch('webauthn.verify_authentication_response') as mock_verify:
mock_verification = MagicMock()
mock_verification.new_sign_count = 1
mock_verify.return_value = mock_verification
response = client.post("/login/complete", json={
"username": username,
"credential": mock_credential
})
token_data = response.json()
# Create a client with the token in headers
auth_client = TestClient(app)
auth_client.headers.update(
{"Authorization": f"Bearer {token_data['access_token']}"}
)
# Return both the client and the raw token for tests
yield auth_client, token_data['access_token']
logger.debug("Tearing down authenticated client fixture")
def test_token_creation():
"""
Test the JWT token creation functionality.
This test verifies that:
1. Tokens can be successfully created with the given user data
2. Created tokens can be properly decoded
3. Token contains the expected user subject claim
4. Token includes a valid expiration timestamp in the future
"""
# This can be a unit test for your token creation function
test_data = {"sub": "test_user"}
token = create_access_token(test_data)
# Verify token can be decoded
decoded = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
assert decoded["sub"] == "test_user"
assert "exp" in decoded
# Assert expiration is in the future using timezone-aware comparison
assert datetime.fromtimestamp(decoded["exp"], UTC) > datetime.now(UTC)
def test_protected_endpoint(authenticated_client):
"""
Test access control for protected endpoints using JWT tokens.
This test verifies that:
1. A valid authentication token can access protected endpoints
2. The protected endpoint returns the expected data structure
3. The response contains the required secret_value field
Args:
authenticated_client: Fixture providing an authenticated
client and its token
"""
logger.debug("Starting protected endpoint access test")
client, token = authenticated_client
logger.debug(f"Using token: {token[:10]}... to access protected endpoint")
response = client.get("/protected")
logger.debug(f"Response status code: {response.status_code}")
logger.debug(f"Response body: {response.json()}")
assert response.status_code == 200, \
"Expected 200 OK response for valid token"
assert "secret_value" in response.json()["data"], \
"Expected 'secret_value' in response data"
logger.debug("Protected endpoint test passed successfully")
def test_token_expiration(authenticated_client):
"""Test that tokens expire correctly"""
# Create a token that expires quickly
test_data = {"sub": "test_user"}
token = create_access_token(
test_data,
expires_delta=timedelta(seconds=1) # Token expires in 1 second
)
# Wait for token to expire
time.sleep(2)
# Try to use the expired token
client = TestClient(app)
client.headers.update({"Authorization": f"Bearer {token}"})
response = client.get("/protected")
assert response.status_code == 401, "Expired token should be rejected"
def test_token_revocation(authenticated_client):
"""Test that revoked tokens cannot be reused"""
auth_client, token = authenticated_client
# First verify token works
response = auth_client.get("/protected")
assert response.status_code == 200
# Revoke token
response = auth_client.post("/token/revoke")
assert response.status_code == 200
# Try to use the revoked token
response = auth_client.get("/protected")
assert response.status_code == 401, "Revoked token should be rejected"
"""
Test suite for database operations and performance.
"""
import time
import uuid
import logging
logger = logging.getLogger(__name__)
def test_database_operations(database):
"""Test database CRUD operations"""
username = f"db_test_{uuid.uuid4()}"
password_hash = "test_hash"
# Test user creation
assert database.add_user(username, password_hash), \
"User creation should succeed"
# Test user retrieval
user = database.get_user(username)
assert user is not None, "Should retrieve created user"
assert user["username"] == username, "Retrieved username should match"
assert user["password_hash"] == password_hash, \
"Retrieved password hash should match"
# Test challenge operations
challenge = "test_challenge"
assert database.add_challenge(username, challenge), \
"Challenge creation should succeed"
assert database.get_challenge(username) == challenge, \
"Retrieved challenge should match"
# Test cleanup operations
database.clear_challenges()
assert database.get_challenge(username) is None, \
"Challenge should be cleared"
def test_database_performance(database):
"""Test that database operations are performant"""
username = f"perf_test_{uuid.uuid4()}"
password_hash = "test_hash"
# Measure user creation performance
start_time = time.time()
for i in range(100):
database.add_user(f"{username}_{i}", password_hash)
end_time = time.time()
elapsed = end_time - start_time
assert elapsed < 1.0, ("Database operations too slow: "
f"{elapsed}s for 100 operations")
"""
Test suite for HTTPS functionality.
This module verifies that the application:
- Correctly serves content over HTTPS
- Uses valid SSL certificates (even if self-signed)
- Properly secures connections
"""
import pytest
import requests
import socket
import ssl
import logging
from urllib.parse import urlparse
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from main import app # Import your FastAPI app
from common import https_server
# Configure base URL for your application
BASE_URL = "https://127.0.0.1:8000" # Use a different port to avoid conflicts
logger = logging.getLogger(__name__)
@pytest.fixture(scope="module")
def https_server_fixture():
"""Fixture that provides an HTTPS server for tests"""
with https_server(app) as server:
yield server
@pytest.fixture
def https_session():
"""Create a requests session that accepts
self-signed certificates for testing"""
session = requests.Session()
session.verify = False # Warning: Only for development testing!
requests.packages.urllib3.disable_warnings() # Suppress insecure warnings
return session
def test_https_connection(https_server, https_session):
"""
Test basic HTTPS connection to the server.
This test verifies that:
1. The server successfully accepts HTTPS connections
2. The server responds with a 200 status code
3. The connection uses the HTTPS scheme
Args:
https_server: Fixture providing a running HTTPS test server
https_session: Requests session configured for HTTPS testing
"""
logger.info("Testing HTTPS connection...")
try:
response = https_session.get(f"{BASE_URL}/")
assert response.status_code == 200
# Verify we're using HTTPS
assert urlparse(response.url).scheme == "https"
logger.info("HTTPS connection test passed")
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection error: {e}")
pytest.fail("Failed to connect to HTTPS server")
def test_ssl_certificate(https_server):
"""
Test SSL certificate configuration and validity.
This test verifies that:
1. The server presents a valid SSL certificate
2. The certificate contains either valid SANs or a Common Name
3. The certificate can be properly parsed and validated
4. The certificate matches the server hostname
Args:
https_server: Fixture providing a running HTTPS test server
"""
logger.info("Testing SSL certificate...")
hostname = urlparse(BASE_URL).hostname
port = urlparse(BASE_URL).port or 443
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE # self-signed certs, development
with context.wrap_socket(socket.socket(),
server_hostname=hostname) as s:
s.connect((hostname, port))
cert = s.getpeercert(binary_form=True)
certificate = x509.load_der_x509_certificate(
cert,
default_backend()
)
# Check for Subject Alternative Names (SAN)
extensions = certificate.extensions
for extension in extensions:
if isinstance(extension.value, x509.SubjectAlternativeName):
san_extension = extension.value
for name in san_extension:
if (isinstance(name, x509.DNSName)
and name.value == hostname):
# If we have a working SAN, that's good enough
logger.info("Found valid SAN in certificate")
return
# If we didn't find a SAN then check for a Common Name (CN)
has_cn = any(name.oid == x509.NameOID.COMMON_NAME
for name in certificate.subject)
assert has_cn, "Certificate must have a Common Name"
logger.info("SSL certificate test passed")
except ConnectionRefusedError:
logger.error("Connection refused when testing SSL certificate")
pytest.fail("Failed to connect to HTTPS server "
"for certificate verification")
import logging
import uuid
import time
import json
import base64
from unittest.mock import patch, MagicMock
import sqlite3
import pytest
from fastapi.testclient import TestClient
from main import RELYING_PARTY_ORIGIN
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@pytest.fixture
def db():
"""Set up a test SQLite database"""
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# Create necessary tables
cursor.execute('''
CREATE TABLE users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE credentials (
id TEXT PRIMARY KEY,
user_id TEXT,
public_key BLOB,
sign_count INTEGER,
FOREIGN KEY(user_id) REFERENCES users(id)
)
''')
conn.commit()
return conn
@pytest.fixture
def test_app(database):
"""Create a test app with the test database"""
from main import app
app.database = database # Inject the test database
return app
@pytest.fixture
def client(test_app):
"""Create a test client with the configured test app"""
return TestClient(
test_app,
base_url="http://127.0.0.1:8000/" # Keep the explicit URL for testing
)
@pytest.fixture
def user_setup_teardown(db):
"""
Sets up a user and credential in the database,
yields control for the test to run,
and then tears them down after the test.
"""
username = "test_user"
user_id = "example_user_id" # Define a user_id
credential_id = "test_credential_id"
public_key = b"mock_public_key" # Needs to be bytes
sign_count = 0
cursor = db.cursor()
cursor.execute(
"INSERT INTO users (id, username) VALUES (?, ?);",
(user_id, username),
)
cursor.execute(
"INSERT INTO credentials (id, user_id, public_key, sign_count)"
" VALUES (?, ?, ?, ?);",
(credential_id, user_id, public_key, sign_count),
)
db.commit()
return username
def test_registration_flow(client, db, user_setup_teardown, https_server):
"""
Test the complete WebAuthn registration flow with mocked authenticator.
This test verifies that:
1. The registration start endpoint returns a valid challenge
2. The registration completion endpoint accepts mocked authenticator data
3. User data is properly stored in the database
4. Credential information is correctly saved
Args:
client: TestClient fixture for making HTTP requests
db: Database fixture providing a test SQLite instance
user_setup_teardown: Fixture handling user creation and cleanup
https_server: Fixture for the HTTPS server
"""
MAX_WAIT = 5 # seconds
start_time = time.time()
username = user_setup_teardown
logger.debug("Step 1: Start registration")
response = client.post("/register/start", json={"username": username})
assert response.status_code == 200
reg_data = response.json()
assert "challenge" in reg_data
logger.debug("Step 2: Mock the authenticator response")
# In real testing, use the webauthn-mocks library
# to generate realistic credentials
mock_credential = {
"id": "test_credential_id",
"rawId": base64.urlsafe_b64encode(b"test_raw_id").decode('ascii'),
"type": "public-key",
"response": {
"clientDataJSON": base64.urlsafe_b64encode(json.dumps({
"type": "webauthn.create",
"challenge": reg_data["challenge"],
"origin": "https://localhost:8000/"
}).encode()).decode('ascii'),
"attestationObject": base64.urlsafe_b64encode(
b"mock_attestation_obj"
).decode('ascii')
}
}
# Patch the verify_registration_response function
with patch('webauthn.verify_registration_response') as mock_verify:
# Configure the mock to return a successful verification
mock_verification = MagicMock()
mock_verification.credential_id = "test_credential_id"
mock_verification.credential_public_key = b"mock_public_key"
mock_verification.sign_count = 0
mock_verify.return_value = mock_verification
# Step 3: Complete registration
response = client.post("/register/complete", json={
"username": username,
"credential": mock_credential
})
if time.time() - start_time > MAX_WAIT:
pytest.fail("Test timeout exceeded")
assert response.status_code == 200
assert response.json()["status"] == "success"
# Verify user was added to database
cursor = db.cursor()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
assert cursor.fetchone() is not None
# Verify credential was stored
cursor.execute("SELECT * FROM credentials")
assert cursor.fetchone() is not None
def test_authentication_flow(client, db, user_setup_teardown, https_server):
"""
Test the WebAuthn authentication flow with mocked authenticator.
This test verifies that:
1. The authentication start endpoint returns a valid challenge
2. The authentication completion endpoint accepts mocked credentials
3. Successful authentication returns a valid access token
4. The in-memory credential store is properly accessed
Args:
client: TestClient fixture for making HTTP requests
db: Database fixture providing a test SQLite instance
user_setup_teardown: Fixture handling user creation and cleanup
https_server: Fixture for the HTTPS server
"""
from main import users_db, credentials_db
username = user_setup_teardown
# Add timeout to prevent hanging
MAX_WAIT = 5 # seconds
start_time = time.time()
# Ensure the user exists in the in-memory database
user_id = "example_user_id"
if username not in users_db:
users_db[username] = {
"id": user_id,
"username": username,
"credentials": ["test_credential_id"]
}
credentials_db["test_credential_id"] = {
"username": username,
"public_key": b"mock_public_key",
"sign_count": 0
}
# Start authentication with timeout
response = client.post("/login/start", json={"username": username})
assert response.status_code == 200
auth_data = response.json()
assert "challenge" in auth_data
# Mock the authenticator response
mock_credential = {
"id": "test_credential_id",
"rawId": base64.urlsafe_b64encode(b"test_credential_id")
.decode('ascii'),
"type": "public-key",
"response": {
"clientDataJSON": base64.urlsafe_b64encode(json.dumps({
"type": "webauthn.get",
"challenge": auth_data["challenge"],
"origin": "https://localhost:8000"
}).encode()).decode('ascii'),
"authenticatorData":
base64.urlsafe_b64encode(b"mock_auth_data").decode('ascii'),
"signature":
base64.urlsafe_b64encode(b"mock_signature").decode('ascii'),
"userHandle":
base64.urlsafe_b64encode(b"user_handle").decode('ascii')
}
}
# Add timeout check
if time.time() - start_time > MAX_WAIT:
pytest.fail("Test timeout exceeded")
# Complete authentication with timeout
with patch('webauthn.verify_authentication_response') as mock_verify:
mock_verification = MagicMock()
mock_verification.new_sign_count = 1
mock_verify.return_value = mock_verification
response = client.post("/login/complete", json={
"username": username,
"credential": mock_credential
})
if time.time() - start_time > MAX_WAIT:
pytest.fail("Test timeout exceeded")
assert response.status_code == 200
result = response.json()
assert result["status"] == "success"
assert "access_token" in result
@pytest.fixture
def mock_credential_factory():
"""Factory fixture to create mock credentials with different parameters"""
def _make_credential(credential_id, challenge,
origin=RELYING_PARTY_ORIGIN):
return {
"id": credential_id,
"rawId": base64.urlsafe_b64encode(credential_id.encode())
.decode('ascii'),
"type": "public-key",
"response": {
"clientDataJSON": base64.urlsafe_b64encode(json.dumps({
"type": "webauthn.create",
"challenge": challenge,
"origin": origin
}).encode()).decode('ascii'),
"attestationObject":
base64.urlsafe_b64encode(b"mock_attestation_obj")
.decode('ascii')
}
}
return _make_credential
def test_registration_duplicate_user(client, database):
"""Test that registering an existing username fails appropriately"""
username = f"duplicate_user_{uuid.uuid4()}"
# First registration should succeed
response = client.post("/register/start", json={"username": username})
assert response.status_code == 200, "First registration should succeed"
# Second registration should fail
response = client.post("/register/start", json={"username": username})
assert response.status_code == 400, "Duplicate registration should fail"
assert "User already exists" in response.json()["detail"]
def test_login_nonexistent_user(client):
"""Test that attempting to log in with a non-existent user returns 400"""
response = client.post("/login/start",
json={"username": f"nonexistent_{uuid.uuid4()}"})
assert response.status_code == 400, \
"Should reject login for non-existent user"
assert "User does not exist" in response.json()["detail"]
def test_login_invalid_challenge(
client, user_setup_teardown, mock_credential_factory
):
"""Test that authentication fails with invalid challenge"""
username = user_setup_teardown
# Skip registration since user_setup_teardown already created the user
# Start login process directly
response = client.post("/login/start", json={"username": username})
assert response.status_code == 200, \
"Login start should succeed for registered user"
# Create credential with wrong challenge
mock_credential = mock_credential_factory(
"test_credential_id",
"invalid_challenge" # Wrong challenge
)
# Attempt login completion with invalid challenge
with patch('webauthn.verify_authentication_response') as mock_verify:
mock_verify.side_effect = Exception("Invalid challenge")
response = client.post("/login/complete", json={
"username": username,
"credential": mock_credential
})
assert response.status_code == 400, "Should reject invalid challenge"
# Security-specific tests
def test_token_expiration(client, short_lived_token):
"""Test that tokens expire correctly"""
token = short_lived_token()
# Wait for token to expire
time.sleep(2)
# Try to access protected route
response = client.get("/protected", headers={
"Authorization": f"Bearer {token}"
})
assert response.status_code == 401, "Expired token should be rejected"
def test_token_revocation(client, short_lived_token):
"""Test that revoked tokens cannot be reused"""
token = short_lived_token()
# Revoke token
response = client.post("/token/revoke", headers={
"Authorization": f"Bearer {token}"
})
assert response.status_code == 200
# Try to use revoked token
response = client.get("/protected", headers={
"Authorization": f"Bearer {token}"
})
assert response.status_code == 401, "Revoked token should be rejected"
# Database operation tests
def test_database_operations(database):
"""Test database CRUD operations"""
username = f"db_test_{uuid.uuid4()}"
password_hash = "test_hash"
# Test user creation
assert database.add_user(username, password_hash), \
"User creation should succeed"
# Test user retrieval
user = database.get_user(username)
assert user is not None, "Should retrieve created user"
assert user["username"] == username, "Retrieved username should match"
assert user["password_hash"] == password_hash, \
"Retrieved password hash should match"
# Test challenge operations
challenge = "test_challenge"
assert database.add_challenge(username, challenge), \
"Challenge creation should succeed"
assert database.get_challenge(username) == challenge, \
"Retrieved challenge should match"
# Test cleanup operations
database.clear_challenges()
assert database.get_challenge(username) is None, \
"Challenge should be cleared"
# Performance test
def test_database_performance(database):
"""Test that database operations are performant"""
username = f"perf_test_{uuid.uuid4()}"
password_hash = "test_hash"
# Measure user creation performance
start_time = time.time()
for i in range(100):
database.add_user(f"{username}_{i}", password_hash)
end_time = time.time()
elapsed = end_time - start_time
assert elapsed < 1.0, ("Database operations too slow: "
f"{elapsed}s for 100 operations")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment