Created
June 5, 2025 03:28
-
-
Save jymchng/b2af708e640b6c71d2331e4abae07805 to your computer and use it in GitHub Desktop.
NestJS Inspired Web Framework
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| NestJS-inspired Python ASGI Web Framework | |
| A modern, decorator-based web framework with dependency injection | |
| Supports Pydantic and SQLModel BaseModels for validation and serialization | |
| """ | |
| import asyncio | |
| import inspect | |
| import json | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass | |
| from functools import wraps | |
| from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, get_origin, get_args | |
| from urllib.parse import parse_qs, urlparse | |
| try: | |
| from pydantic import BaseModel, ValidationError | |
| PYDANTIC_AVAILABLE = True | |
| except ImportError: | |
| PYDANTIC_AVAILABLE = False | |
| BaseModel = None | |
| ValidationError = None | |
| # Type definitions | |
| T = TypeVar('T') | |
| Handler = Callable[..., Any] | |
| # Core exceptions | |
| class HttpException(Exception): | |
| def __init__(self, status_code: int, message: str): | |
| self.status_code = status_code | |
| self.message = message | |
| super().__init__(message) | |
| class UnauthorizedException(HttpException): | |
| def __init__(self, message: str = "Unauthorized"): | |
| super().__init__(401, message) | |
| class ForbiddenException(HttpException): | |
| def __init__(self, message: str = "Forbidden"): | |
| super().__init__(403, message) | |
| class BadRequestException(HttpException): | |
| def __init__(self, message: str = "Bad Request"): | |
| super().__init__(400, message) | |
| class ValidationException(BadRequestException): | |
| def __init__(self, errors: List[Dict[str, Any]]): | |
| self.errors = errors | |
| super().__init__("Validation failed") | |
| # Request/Response classes | |
| @dataclass | |
| class Request: | |
| method: str | |
| path: str | |
| query_params: Dict[str, List[str]] | |
| headers: Dict[str, str] | |
| body: bytes | |
| path_params: Dict[str, str] = None | |
| def __post_init__(self): | |
| if self.path_params is None: | |
| self.path_params = {} | |
| async def json(self) -> Dict[str, Any]: | |
| """Parse request body as JSON""" | |
| try: | |
| return json.loads(self.body.decode()) | |
| except (json.JSONDecodeError, UnicodeDecodeError): | |
| raise BadRequestException("Invalid JSON") | |
| async def validate(self, model_class: Type[BaseModel]) -> BaseModel: | |
| """Parse and validate request body using Pydantic model""" | |
| if not PYDANTIC_AVAILABLE: | |
| raise RuntimeError("Pydantic not installed. Install with: pip install pydantic") | |
| try: | |
| data = await self.json() | |
| return model_class(**data) | |
| except ValidationError as e: | |
| errors = [] | |
| for error in e.errors(): | |
| errors.append({ | |
| "field": ".".join(str(x) for x in error["loc"]), | |
| "message": error["msg"], | |
| "type": error["type"] | |
| }) | |
| raise ValidationException(errors) | |
| except BadRequestException: | |
| raise | |
| @dataclass | |
| class Response: | |
| body: Union[str, dict, list, BaseModel] | |
| status_code: int = 200 | |
| headers: Dict[str, str] = None | |
| def __post_init__(self): | |
| if self.headers is None: | |
| self.headers = {} | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert response body to dictionary for JSON serialization""" | |
| if PYDANTIC_AVAILABLE and isinstance(self.body, BaseModel): | |
| return self.body.model_dump() | |
| elif hasattr(self.body, 'dict'): # SQLModel compatibility | |
| return self.body.dict() | |
| elif isinstance(self.body, (dict, list)): | |
| return self.body | |
| else: | |
| return {"data": self.body} | |
| # Dependency Injection Container | |
| class DIContainer: | |
| def __init__(self): | |
| self._services: Dict[Type, Any] = {} | |
| self._singletons: Dict[Type, Any] = {} | |
| def register(self, service_class: Type[T], instance: T = None) -> None: | |
| """Register a service class or instance""" | |
| if instance: | |
| self._singletons[service_class] = instance | |
| else: | |
| self._services[service_class] = service_class | |
| def get(self, service_class: Type[T]) -> T: | |
| """Get service instance with dependency injection""" | |
| # Check if singleton exists | |
| if service_class in self._singletons: | |
| return self._singletons[service_class] | |
| # Check if service is registered | |
| if service_class not in self._services: | |
| raise ValueError(f"Service {service_class.__name__} not found") | |
| # Create instance with dependency injection | |
| constructor = self._services[service_class] | |
| sig = inspect.signature(constructor.__init__) | |
| kwargs = {} | |
| for param_name, param in sig.parameters.items(): | |
| if param_name == 'self': | |
| continue | |
| if param.annotation != inspect.Parameter.empty: | |
| kwargs[param_name] = self.get(param.annotation) | |
| instance = constructor(**kwargs) | |
| # Store as singleton if marked as injectable | |
| if hasattr(constructor, '_injectable'): | |
| self._singletons[service_class] = instance | |
| return instance | |
| # Global container instance | |
| container = DIContainer() | |
| # Decorators | |
| def Injectable(cls: Type[T]) -> Type[T]: | |
| """Mark a class as injectable service""" | |
| cls._injectable = True | |
| container.register(cls) | |
| return cls | |
| def Controller(prefix: str = ""): | |
| """Controller decorator""" | |
| def decorator(cls): | |
| cls._controller_prefix = prefix | |
| cls._routes = [] | |
| return cls | |
| return decorator | |
| def Get(path: str = ""): | |
| """GET route decorator""" | |
| def decorator(func): | |
| func._route_method = "GET" | |
| func._route_path = path | |
| return func | |
| return decorator | |
| def Post(path: str = ""): | |
| """POST route decorator""" | |
| def decorator(func): | |
| func._route_method = "POST" | |
| func._route_path = path | |
| return func | |
| return decorator | |
| def Put(path: str = ""): | |
| """PUT route decorator""" | |
| def decorator(func): | |
| func._route_method = "PUT" | |
| func._route_path = path | |
| return func | |
| return decorator | |
| def Delete(path: str = ""): | |
| """DELETE route decorator""" | |
| def decorator(func): | |
| func._route_method = "DELETE" | |
| func._route_path = path | |
| return func | |
| return decorator | |
| # Guard interface | |
| class Guard(ABC): | |
| @abstractmethod | |
| async def can_activate(self, request: Request) -> bool: | |
| """Return True if request should be allowed""" | |
| pass | |
| def Body(model_class: Type[BaseModel] = None): | |
| """Decorator to automatically validate request body""" | |
| def decorator(func): | |
| func._body_model = model_class | |
| return func | |
| return decorator | |
| def Param(name: str, param_type: Type = str): | |
| """Decorator to specify path parameter types""" | |
| def decorator(func): | |
| if not hasattr(func, '_param_types'): | |
| func._param_types = {} | |
| func._param_types[name] = param_type | |
| return func | |
| return decorator | |
| def Query(name: str, param_type: Type = str, required: bool = False): | |
| """Decorator to specify query parameter validation""" | |
| def decorator(func): | |
| if not hasattr(func, '_query_params'): | |
| func._query_params = {} | |
| func._query_params[name] = {'type': param_type, 'required': required} | |
| return func | |
| return decorator | |
| # Route matching | |
| class Route: | |
| def __init__(self, method: str, path: str, handler: Callable, guards: List[Type[Guard]] = None): | |
| self.method = method | |
| self.path = path | |
| self.handler = handler | |
| self.guards = guards or [] | |
| self.path_pattern = self._compile_path(path) | |
| def _compile_path(self, path: str): | |
| """Convert path with parameters to regex pattern""" | |
| import re | |
| # Simple parameter matching: /users/:id -> /users/([^/]+) | |
| pattern = path | |
| params = [] | |
| # Find all :param patterns | |
| param_pattern = r':([a-zA-Z_][a-zA-Z0-9_]*)' | |
| matches = re.finditer(param_pattern, path) | |
| for match in matches: | |
| param_name = match.group(1) | |
| params.append(param_name) | |
| pattern = pattern.replace(f':{param_name}', '([^/]+)') | |
| self.param_names = params | |
| return f"^{pattern}$" | |
| def matches(self, method: str, path: str) -> tuple[bool, Dict[str, str]]: | |
| """Check if route matches and extract parameters""" | |
| import re | |
| if self.method != method: | |
| return False, {} | |
| match = re.match(self.path_pattern, path) | |
| if not match: | |
| return False, {} | |
| params = {} | |
| for i, param_name in enumerate(self.param_names): | |
| params[param_name] = match.group(i + 1) | |
| return True, params | |
| # Main Application class | |
| class Application: | |
| def __init__(self): | |
| self.routes: List[Route] = [] | |
| self.global_guards: List[Type[Guard]] = [] | |
| def use_global_guards(self, *guards: Type[Guard]): | |
| """Add global guards""" | |
| self.global_guards.extend(guards) | |
| def register_controller(self, controller_class: Type): | |
| """Register a controller and its routes""" | |
| controller_prefix = getattr(controller_class, '_controller_prefix', '') | |
| controller_guards = getattr(controller_class, '_controller_guards', []) | |
| # Get all methods that are route handlers | |
| for method_name in dir(controller_class): | |
| method = getattr(controller_class, method_name) | |
| if hasattr(method, '_route_method'): | |
| route_method = method._route_method | |
| route_path = method._route_path | |
| method_guards = getattr(method, '_guards', []) | |
| # Combine path prefix | |
| full_path = f"{controller_prefix.rstrip('/')}/{route_path.lstrip('/')}" | |
| if full_path.endswith('/') and len(full_path) > 1: | |
| full_path = full_path[:-1] | |
| if not full_path.startswith('/'): | |
| full_path = '/' + full_path | |
| # Combine guards (global -> controller -> method) | |
| all_guards = list(self.global_guards) + list(controller_guards) + list(method_guards) | |
| # Create route handler | |
| async def route_handler(request: Request, original_method=method): | |
| controller_instance = container.get(controller_class) | |
| # Get method signature for dependency injection | |
| sig = inspect.signature(original_method) | |
| kwargs = {} | |
| # Handle request parameter | |
| if 'request' in sig.parameters: | |
| kwargs['request'] = request | |
| # Handle body validation | |
| body_model = getattr(original_method, '_body_model', None) | |
| if body_model and _is_pydantic_model(body_model): | |
| validated_body = await request.validate(body_model) | |
| # Find the parameter that should receive the body | |
| for param_name, param in sig.parameters.items(): | |
| if param.annotation == body_model: | |
| kwargs[param_name] = validated_body | |
| break | |
| # Handle path parameters with type conversion | |
| param_types = getattr(original_method, '_param_types', {}) | |
| for param_name, param in sig.parameters.items(): | |
| if param_name in request.path_params: | |
| param_type = param_types.get(param_name, str) | |
| kwargs[param_name] = _convert_path_param( | |
| request.path_params[param_name], | |
| param_type | |
| ) | |
| # Handle query parameters | |
| query_params = getattr(original_method, '_query_params', {}) | |
| for param_name, config in query_params.items(): | |
| param_type = config['type'] | |
| required = config['required'] | |
| values = request.query_params.get(param_name, []) | |
| if required and not values: | |
| raise BadRequestException(f"Missing required query parameter: {param_name}") | |
| if values: | |
| kwargs[param_name] = _convert_query_param(values, param_type) | |
| # Inject dependencies | |
| for param_name, param in sig.parameters.items(): | |
| if (param_name not in kwargs and | |
| param_name not in ['self', 'request'] and | |
| param.annotation != inspect.Parameter.empty): | |
| try: | |
| kwargs[param_name] = container.get(param.annotation) | |
| except ValueError: | |
| pass # Skip if service not found | |
| result = await original_method(controller_instance, **kwargs) | |
| # Handle different return types | |
| if isinstance(result, Response): | |
| return result | |
| elif PYDANTIC_AVAILABLE and isinstance(result, BaseModel): | |
| return Response(result, headers={'Content-Type': 'application/json'}) | |
| elif isinstance(result, (dict, list)): | |
| return Response(result, headers={'Content-Type': 'application/json'}) | |
| else: | |
| return Response(str(result)) | |
| route = Route(route_method, full_path, route_handler, all_guards) | |
| self.routes.append(route) | |
| async def _check_guards(self, guards: List[Type[Guard]], request: Request) -> bool: | |
| """Check all guards for a request""" | |
| for guard_class in guards: | |
| guard_instance = container.get(guard_class) | |
| if not await guard_instance.can_activate(request): | |
| return False | |
| return True | |
| async def _handle_request(self, request: Request) -> Response: | |
| """Handle incoming request""" | |
| # Find matching route | |
| for route in self.routes: | |
| matches, path_params = route.matches(request.method, request.path) | |
| if matches: | |
| request.path_params = path_params | |
| # Check guards | |
| if not await self._check_guards(route.guards, request): | |
| raise UnauthorizedException() | |
| # Call handler | |
| return await route.handler(request) | |
| # No route found | |
| return Response("Not Found", status_code=404) | |
| async def __call__(self, scope, receive, send): | |
| """ASGI application callable""" | |
| if scope['type'] != 'http': | |
| return | |
| # Parse request | |
| method = scope['method'] | |
| path = scope['path'] | |
| query_string = scope.get('query_string', b'').decode() | |
| headers = {k.decode(): v.decode() for k, v in scope.get('headers', [])} | |
| # Parse query parameters | |
| query_params = {} | |
| if query_string: | |
| query_params = parse_qs(query_string) | |
| # Read body | |
| body = b'' | |
| while True: | |
| message = await receive() | |
| if message['type'] == 'http.request': | |
| body += message.get('body', b'') | |
| if not message.get('more_body', False): | |
| break | |
| request = Request( | |
| method=method, | |
| path=path, | |
| query_params=query_params, | |
| headers=headers, | |
| body=body | |
| ) | |
| try: | |
| response = await self._handle_request(request) | |
| except HttpException as e: | |
| response = Response( | |
| {'error': e.message, 'statusCode': e.status_code}, | |
| status_code=e.status_code, | |
| headers={'Content-Type': 'application/json'} | |
| ) | |
| except ValidationException as e: | |
| response = Response( | |
| { | |
| 'error': 'Validation failed', | |
| 'statusCode': 400, | |
| 'details': e.errors | |
| }, | |
| status_code=400, | |
| headers={'Content-Type': 'application/json'} | |
| ) | |
| except Exception as e: | |
| response = Response( | |
| {'error': 'Internal Server Error', 'statusCode': 500}, | |
| status_code=500, | |
| headers={'Content-Type': 'application/json'} | |
| ) | |
| # Send response | |
| response_body = response.body | |
| if isinstance(response_body, (dict, list)): | |
| response_body = json.dumps(response_body) | |
| if 'Content-Type' not in response.headers: | |
| response.headers['Content-Type'] = 'application/json' | |
| elif PYDANTIC_AVAILABLE and isinstance(response_body, BaseModel): | |
| response_body = response_body.model_dump_json() | |
| if 'Content-Type' not in response.headers: | |
| response.headers['Content-Type'] = 'application/json' | |
| elif hasattr(response_body, 'json'): # SQLModel compatibility | |
| response_body = response_body.json() | |
| if 'Content-Type' not in response.headers: | |
| response.headers['Content-Type'] = 'application/json' | |
| response_body = response_body.encode() if isinstance(response_body, str) else response_body | |
| await send({ | |
| 'type': 'http.response.start', | |
| 'status': response.status_code, | |
| 'headers': [[k.encode(), v.encode()] for k, v in response.headers.items()], | |
| }) | |
| await send({ | |
| 'type': 'http.response.body', | |
| 'body': response_body, | |
| }) | |
| # Factory function to create application | |
| def create_app() -> Application: | |
| """Create a new application instance""" | |
| return Application() | |
| # Example usage | |
| if __name__ == "__main__": | |
| # Pydantic models for validation | |
| if PYDANTIC_AVAILABLE: | |
| from pydantic import BaseModel, EmailStr | |
| class CreateUserDto(BaseModel): | |
| name: str | |
| email: str | |
| age: Optional[int] = None | |
| class UpdateUserDto(BaseModel): | |
| name: Optional[str] = None | |
| email: Optional[str] = None | |
| age: Optional[int] = None | |
| class UserResponse(BaseModel): | |
| id: int | |
| name: str | |
| email: str | |
| age: Optional[int] = None | |
| else: | |
| # Fallback if Pydantic not available | |
| CreateUserDto = dict | |
| UpdateUserDto = dict | |
| UserResponse = dict | |
| # Example service | |
| @Injectable | |
| class UserService: | |
| def __init__(self): | |
| self.users = [ | |
| {"id": 1, "name": "John Doe", "email": "[email protected]", "age": 30}, | |
| {"id": 2, "name": "Jane Smith", "email": "[email protected]", "age": 25} | |
| ] | |
| self.next_id = 3 | |
| async def get_all_users(self) -> List[dict]: | |
| return self.users | |
| async def get_user_by_id(self, user_id: int) -> dict: | |
| user = next((u for u in self.users if u["id"] == user_id), None) | |
| if not user: | |
| raise HttpException(404, "User not found") | |
| return user | |
| async def create_user(self, user_data: CreateUserDto) -> dict: | |
| if PYDANTIC_AVAILABLE: | |
| user_dict = user_data.model_dump() | |
| else: | |
| user_dict = user_data | |
| new_user = {"id": self.next_id, **user_dict} | |
| self.users.append(new_user) | |
| self.next_id += 1 | |
| return new_user | |
| async def update_user(self, user_id: int, user_data: UpdateUserDto) -> dict: | |
| user = await self.get_user_by_id(user_id) | |
| if PYDANTIC_AVAILABLE: | |
| update_dict = user_data.model_dump(exclude_unset=True) | |
| else: | |
| update_dict = {k: v for k, v in user_data.items() if v is not None} | |
| user.update(update_dict) | |
| return user | |
| async def delete_user(self, user_id: int) -> bool: | |
| user = await self.get_user_by_id(user_id) | |
| self.users.remove(user) | |
| return True | |
| # Example guard | |
| @Injectable | |
| class AuthGuard(Guard): | |
| async def can_activate(self, request: Request) -> bool: | |
| # Simple token check | |
| auth_header = request.headers.get('authorization', '') | |
| return auth_header.startswith('Bearer ') | |
| # Example controller with Pydantic validation | |
| @Controller('/users') | |
| class UserController: | |
| def __init__(self, user_service: UserService): | |
| self.user_service = user_service | |
| @Get() | |
| async def get_users(self) -> List[dict]: | |
| """Get all users""" | |
| return await self.user_service.get_all_users() | |
| @Get('/:id') | |
| @Param('id', int) | |
| async def get_user(self, id: int) -> dict: | |
| """Get user by ID with automatic int conversion""" | |
| return await self.user_service.get_user_by_id(id) | |
| @Post() | |
| @UseGuards(AuthGuard) | |
| @Body(CreateUserDto) | |
| async def create_user(self, body: CreateUserDto) -> dict: | |
| """Create user with automatic validation""" | |
| return await self.user_service.create_user(body) | |
| @Put('/:id') | |
| @UseGuards(AuthGuard) | |
| @Param('id', int) | |
| @Body(UpdateUserDto) | |
| async def update_user(self, id: int, body: UpdateUserDto) -> dict: | |
| """Update user with validation""" | |
| return await self.user_service.update_user(id, body) | |
| @Delete('/:id') | |
| @UseGuards(AuthGuard) | |
| @Param('id', int) | |
| async def delete_user(self, id: int) -> dict: | |
| """Delete user""" | |
| await self.user_service.delete_user(id) | |
| return {"message": "User deleted successfully"} | |
| # Search controller with query params | |
| @Controller('/search') | |
| class SearchController: | |
| def __init__(self, user_service: UserService): | |
| self.user_service = user_service | |
| @Get('/users') | |
| @Query('q', str, required=True) | |
| @Query('limit', int, required=False) | |
| async def search_users(self, q: str, limit: Optional[int] = 10): | |
| """Search users with query parameters""" | |
| users = await self.user_service.get_all_users() | |
| # Simple search by name | |
| filtered = [u for u in users if q.lower() in u['name'].lower()] | |
| if limit: | |
| filtered = filtered[:limit] | |
| return { | |
| "query": q, | |
| "limit": limit, | |
| "results": filtered, | |
| "count": len(filtered) | |
| } | |
| # Public controller (no auth required) | |
| @Controller('/health') | |
| class HealthController: | |
| @Get() | |
| async def health_check(self): | |
| return {"status": "ok", "timestamp": "2025-06-05T00:00:00Z"} | |
| # Create and configure app | |
| app = create_app() | |
| app.register_controller(UserController) | |
| app.register_controller(SearchController) | |
| app.register_controller(HealthController) | |
| # To run with uvicorn: | |
| # uvicorn main:app --reload | |
| print("Framework created with Pydantic/SQLModel support!") | |
| print("Example controllers registered with validation.") | |
| print("To run: uvicorn <filename>:app --reload") | |
| if not PYDANTIC_AVAILABLE: | |
| print("\nNote: Install Pydantic for full validation support:") | |
| print("pip install pydantic") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment