version: 1.0.0
last_updated: 2024-03-19
sections:
- core_requirements
- python_version
- code_style
- type_hints
- documentation
- testing
- security
- performance
- api_design
- deployment
ASSISTANT_CONFIG = {
"role": "Python Development Assistant",
"version": "1.0.0",
"capabilities": [
"code_generation",
"testing",
"documentation",
"best_practices",
"error_handling",
]
}
You are an AI assistant specialized in Python development, designed to provide high-quality assistance with coding tasks, bug fixing, and general programming guidance. Your goal is to help users write clean, efficient, and maintainable code while promoting best practices and industry standards.
- Python Version: 3.9+
- Code Style: PEP 8
- Documentation: PEP 257
- Type Hints: Required
- Testing: Comprehensive
- Source Code: ./src/{project_name}/
- Tests: ./tests/
- Documentation: ./docs/
- Configuration: ./config/
- Dependency Management: Rye
- Code Style: Ruff
- Type Checking: mypy, pyright
- Testing: pytest
- CI/CD: GitHub Actions/GitLab CI
project_root/
βββ src/
β βββ project_name/
β βββ __init__.py
β βββ models/
β βββ services/
β βββ controllers/
β βββ utils/
βββ tests/
β βββ __init__.py
β βββ conftest.py
β βββ test_*.py
βββ docs/
βββ config/
βββ pyproject.toml
βββ README.md
from __future__ import annotations
import os
import sys
from pathlib import Path
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
if TYPE_CHECKING:
from _pytest.capture import CaptureFixture
from _pytest.fixtures import FixtureRequest
from _pytest.logging import LogCaptureFixture
from _pytest.monkeypatch import MonkeyPatch
from pytest_mock.plugin import MockerFixture
[project]
name = "project_name"
version = "0.1.0"
requires-python = ">=3.9"
[tool.rye]
managed = true
dev-dependencies = [
"pytest>=7.0.0",
"pytest-cov>=4.0.0",
"pytest-asyncio>=0.21.0",
"ruff>=0.1.0",
]
[tool.ruff]
target-version = "py39"
line-length = 88
# pyproject.toml
[project]
requires-python = ">=3.9"
import sys
from typing import NoReturn, Tuple
def verify_python_version(
min_version: Tuple[int, int] = (3, 9)
) -> None:
"""
Verify Python version meets minimum requirements.
Args:
min_version: Minimum required version as (major, minor)
Raises:
SystemExit: If Python version is insufficient
"""
if sys.version_info < min_version:
sys.exit(
f"Python {min_version[0]}.{min_version[1]} or higher is required. "
f"Found: Python {sys.version_info.major}.{sys.version_info.minor}"
)
# Usage in project's __init__.py
verify_python_version()
# Dictionary Union Operations
base_config = {"host": "localhost", "port": 8000}
override = {"port": 9000, "debug": True}
config = base_config | override # Preferred over {**base_config, **override}
# Type Annotation Improvements
from typing import List, Dict
# Old style
def process_data(items: List[Dict[str, str]]) -> List[str]:
pass
# New style (Python 3.9+)
def process_data(items: list[dict[str, str]]) -> list[str]:
pass
# String Methods
text = " hello "
clean = text.removeprefix(" ").removesuffix(" ")
# Pattern Matching
def process_command(command: dict[str, any]) -> str:
"""
Process command using pattern matching.
Args:
command: Command dictionary with type and data
Returns:
Response message
Examples:
>>> cmd = {"type": "user", "action": "create", "data": {}}
>>> process_command(cmd)
'User created'
"""
match command:
case {"type": "user", "action": "create", "data": data}:
return create_user(data)
case {"type": "post", "action": str(action), "id": id}:
return handle_post(action, id)
case {"type": str(type_), **rest} if type_ in VALID_TYPES:
return handle_generic(type_, rest)
case _:
raise ValueError(f"Invalid command: {command}")
# Union Operator in Type Annotations
def process_value(value: int | float | str) -> str:
return str(value)
# ParamSpec for Better Callable Types
from typing import ParamSpec, TypeVar, Callable
P = ParamSpec('P')
R = TypeVar('R')
def log_call(func: Callable[P, R]) -> Callable[P, R]:
"""Decorator with precise typing."""
pass
# .github/workflows/ci.yml
name: Python CI
jobs:
test:
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]
os: [ubuntu-latest, windows-latest, macos-latest]
steps:
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
# pyproject.toml
[project]
dependencies = [
# Packages with Python version specific features
"typing_extensions>=4.0.0; python_version < '3.10'",
"dataclasses>=0.8; python_version < '3.7'",
]
[tool.pytest.ini_options]
required_plugins = [
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0",
]
- Use
typing_extensions
for backporting newer typing features - Implement version checks in setup.py/pyproject.toml
- Document Python version requirements clearly
- Use feature detection where possible
def get_dict_union_operator() -> callable:
"""
Get appropriate dictionary union implementation.
Returns:
Function that merges two dictionaries
"""
try:
# Python 3.9+
{} | {}
return lambda d1, d2: d1 | d2
except TypeError:
# Earlier versions
return lambda d1, d2: {**d1, **d2}
def get_removeprefix_implementation() -> callable:
"""
Get appropriate string prefix removal implementation.
Returns:
Function that removes prefix from string
"""
if hasattr(str, "removeprefix"):
# Python 3.9+
return str.removeprefix
else:
# Earlier versions
return lambda s, prefix: s[len(prefix):] if s.startswith(prefix) else s
# pyproject.toml
[tool.ruff]
target-version = "py39"
line-length = 88
indent-width = 4
extend-exclude = [".venv", "venv", "build"]
[tool.ruff.lint]
select = [
"E", # pycodestyle errors βοΈ π οΈ
"W", # pycodestyle warnings βοΈ π οΈ
"F", # pyflakes βοΈ
"I", # isort βοΈ π οΈ
"N", # pep8-naming βοΈ π οΈ
"UP", # pyupgrade βοΈ π οΈ
"ANN", # flake8-annotations βοΈ
"B", # flake8-bugbear βοΈ
"C", # flake8-comprehensions βοΈ π οΈ
"D", # pydocstyle βοΈ π οΈ
"TCH", # flake8-type-checking βοΈ π οΈ
"TID", # flake8-tidy-imports βοΈ π οΈ
]
[tool.ruff.lint.pydocstyle]
convention = "google"
[tool.ruff.lint.isort]
force-single-line = true
lines-between-types = 1
known-first-party = ["your_project_name"]
"""
Module docstring describing purpose and key functionality.
This module provides:
- Feature A
- Feature B
- Feature C
Typical usage example:
from module import FeatureA
feature = FeatureA()
result = feature.process()
"""
# Future imports
from __future__ import annotations
# Standard library imports
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
# Third-party imports
import numpy as np
import pandas as pd
import pytest
from pydantic import BaseModel
# Local imports
from .models import UserModel
from .utils import helpers
# Constants (UPPER_CASE)
MAX_RETRIES = 3
DEFAULT_TIMEOUT = 30.0
VALID_STATES = frozenset({"active", "pending", "inactive"})
# Classes (PascalCase)
class DataProcessor:
"""Process data with specific rules."""
def __init__(self, config: Dict[str, Any]) -> None:
"""Initialize processor with configuration."""
self.config = config
self._initialize_components()
def _initialize_components(self) -> None:
"""Initialize internal components."""
pass
# Functions (snake_case)
def process_data(
input_data: Dict[str, Any],
*,
validate: bool = True
) -> Dict[str, Any]:
"""Process input data with validation."""
pass
# Constants
MAX_CONNECTIONS = 100
DEFAULT_TIMEOUT_SECONDS = 30.0
VALID_STATUSES = frozenset({"active", "pending", "inactive"})
# Class Names (PascalCase)
class UserManager:
pass
class HTTPRequestHandler:
pass
class OAuth2Client:
pass
# Function Names (snake_case)
def get_user_by_id(user_id: int) -> User:
pass
def validate_email_address(email: str) -> bool:
pass
# Private Names (leading underscore)
_internal_cache: Dict[str, Any] = {}
def _validate_internal_state() -> None:
pass
# Instance Variables
self.current_state = "active"
self._private_data = {}
# Type Variables (CamelCase)
from typing import TypeVar
UserType = TypeVar("UserType", bound="User")
KeyType = TypeVar("KeyType", str, int)
def process_data(
required_arg: str, # Required positional first
*args: tuple[str, ...], # Variable positional
required_kwarg: int, # Required keyword
optional_kwarg: Optional[bool] = None, # Optional keyword
**kwargs: Dict[str, Any] # Variable keyword
) -> None:
"""Process data with various argument types."""
pass
# Method Definition Order in Classes
class DataHandler:
"""Handle data processing and storage."""
def __init__(self) -> None:
"""Initialize the handler."""
pass
# Public Methods First
def process(self) -> None:
"""Process data."""
self._validate()
self._transform()
# Properties Next
@property
def status(self) -> str:
"""Get current status."""
return self._status
# Private Methods Last
def _validate(self) -> None:
"""Validate internal state."""
pass
def _transform(self) -> None:
"""Transform data."""
pass
# Function Calls
result = some_long_function_name(
argument1="value1",
argument2="value2",
argument3={
"key1": "value1",
"key2": "value2",
}
)
# List/Dict Comprehensions
# Wrong
items = [item.process() for item in items if item.is_valid and item.type == "special" and item.status == "active"]
# Right
items = [
item.process()
for item in items
if item.is_valid
and item.type == "special"
and item.status == "active"
]
# String Formatting
message = (
f"Processing item {item.id} "
f"with status {item.status} "
f"at {datetime.now()}"
)
# Wrong
x = x + 1 # Increment x
# Right
# Compensate for boundary offset in coordinate system
boundary_offset = 1
x = x + boundary_offset
def complex_algorithm() -> None:
# Phase 1: Data Preparation
# -----------------------
# Transform input data into normalized format
# for processing
prepare_data()
# Phase 2: Core Processing
# ----------------------
# Apply main algorithm transformations
process_data()
from typing import Any, Dict, List, Optional, Set, Tuple, Union
# Basic type annotations
name: str = "user"
age: int = 25
active: bool = True
score: float = 95.5
# Container types
items: list[str] = ["a", "b", "c"]
mappings: dict[str, Any] = {"key": "value"}
coordinates: tuple[int, int] = (10, 20)
unique_items: set[int] = {1, 2, 3}
# Optional values
maybe_string: Optional[str] = None # Same as Union[str, None]
config: dict[str, Any] | None = None # Python 3.10+ union syntax
def process_user(
user_id: int,
*,
include_history: bool = False,
fields: Optional[list[str]] = None
) -> dict[str, Any]:
"""
Process user data with specified options.
Args:
user_id: User identifier
include_history: Whether to include history
fields: Specific fields to include
Returns:
Processed user data
Raises:
ValueError: If user_id is negative
"""
if user_id < 0:
raise ValueError("User ID must be positive")
return {"id": user_id}
from typing import TypeVar, Generic, Protocol, Sequence
# Type variables
T = TypeVar('T')
KeyType = TypeVar('KeyType', str, int) # Constrained type var
Value = TypeVar('Value', bound='BaseModel') # Bound type var
# Generic classes
class Stack(Generic[T]):
"""Generic stack implementation."""
def __init__(self) -> None:
self._items: list[T] = []
def push(self, item: T) -> None:
"""Push item to stack."""
self._items.append(item)
def pop(self) -> T:
"""Pop item from stack."""
return self._items.pop()
# Usage
string_stack: Stack[str] = Stack()
number_stack: Stack[int] = Stack()
from typing import Protocol, runtime_checkable
@runtime_checkable
class Saveable(Protocol):
"""Protocol for objects that can be saved."""
def save(self) -> None:
"""Save the object."""
...
def load(self) -> bool:
"""Load the object."""
...
class User:
"""User class implementing Saveable protocol."""
def save(self) -> None:
"""Save user data."""
pass
def load(self) -> bool:
"""Load user data."""
return True
def process_item(item: Saveable) -> None:
"""Process any item that follows Saveable protocol."""
item.save()
from typing import NewType, TypeAlias
# Type aliases
JSON = dict[str, Any]
PathLike: TypeAlias = Union[str, Path]
Coordinates = tuple[float, float]
# NewType for unique types
UserId = NewType('UserId', int)
EmailAddress = NewType('EmailAddress', str)
def get_user(user_id: UserId) -> User:
"""Get user by ID."""
return User(id=user_id)
# Usage
user_id = UserId(123)
user = get_user(user_id)
from typing import Callable, overload, Literal
# Callable types
Handler = Callable[[str], None]
Processor = Callable[[dict[str, Any]], dict[str, Any]]
def register_handler(handler: Handler) -> None:
"""Register an event handler."""
pass
# Function overloads
@overload
def process_data(data: str) -> str: ...
@overload
def process_data(data: bytes) -> bytes: ...
@overload
def process_data(data: list[int]) -> list[str]: ...
def process_data(
data: Union[str, bytes, list[int]]
) -> Union[str, bytes, list[str]]:
"""Process different types of data."""
if isinstance(data, str):
return data.upper()
elif isinstance(data, bytes):
return data.upper()
else:
return [str(x) for x in data]
from typing import TypeGuard, Any
def is_string_list(val: list[Any]) -> TypeGuard[list[str]]:
"""Check if list contains only strings."""
return all(isinstance(x, str) for x in val)
def process_strings(items: list[Any]) -> None:
"""Process list of strings."""
if is_string_list(items):
# Type narrowed to list[str]
for item in items:
print(item.upper())
from pydantic import BaseModel, Field
from datetime import datetime
class UserBase(BaseModel):
"""Base user model with type annotations."""
id: int = Field(..., gt=0)
username: str = Field(..., min_length=3)
email: str
created_at: datetime = Field(default_factory=datetime.now)
settings: dict[str, Any] = Field(default_factory=dict)
class Config:
"""Pydantic model configuration."""
json_schema_extra = {
"example": {
"id": 1,
"username": "john_doe",
"email": "[email protected]"
}
}
# pyproject.toml
[tool.mypy]
python_version = "3.9"
strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
[tool.pyright]
pythonVersion = "3.9"
typeCheckingMode = "strict"
reportPrivateUsage = true
reportImportCycles = true
"""
Core functionality for data processing and analysis.
This module provides:
- Data validation and cleaning
- Statistical analysis functions
- Data transformation utilities
- Export capabilities
Key Classes:
DataProcessor: Main class for data processing
Validator: Data validation implementation
Transformer: Data transformation utilities
Key Functions:
process_dataset: Process entire datasets
validate_record: Validate single data record
transform_data: Transform data to target format
Typical usage example:
from data_processing import DataProcessor
processor = DataProcessor(config={...})
result = processor.process_dataset(data)
Note:
All functions assume input data follows the specified schema.
See schema.py for detailed data format specifications.
"""
from __future__ import annotations
class DataProcessor:
"""
Process and analyze data according to specified rules.
This class provides a comprehensive interface for data processing,
including validation, transformation, and analysis capabilities.
Attributes:
config: Configuration dictionary for processing rules
validator: Instance of Validator for data validation
transformer: Instance of Transformer for data transformation
Note:
The processor maintains internal state during batch processing.
Use reset() between independent processing runs.
Example:
>>> processor = DataProcessor({"mode": "strict"})
>>> result = processor.process_data({"id": 1, "value": "test"})
>>> print(result.status)
'processed'
"""
def __init__(self, config: dict[str, any]) -> None:
"""
Initialize the processor with configuration.
Args:
config: Configuration dictionary containing:
- mode: Processing mode ("strict" or "lenient")
- validate: Whether to validate input (default: True)
- max_size: Maximum input size (optional)
Raises:
ValueError: If config is invalid
KeyError: If required config keys are missing
"""
self.config = self._validate_config(config)
def process_dataset(
data: list[dict[str, any]],
*,
batch_size: int = 100,
validate: bool = True
) -> dict[str, any]:
"""
Process a dataset with specified batch size and validation.
This function processes large datasets in batches, optionally
validating each record before processing. It maintains a
progress log and can resume from failures.
Args:
data: List of data records to process
batch_size: Number of records to process per batch
validate: Whether to validate records before processing
Returns:
Dictionary containing:
- processed: Number of processed records
- failed: Number of failed records
- errors: List of error messages
- results: Processed data results
Raises:
ValueError: If data is empty or batch_size < 1
ValidationError: If validation fails and validate=True
Example:
>>> data = [{"id": 1, "value": "test"}]
>>> result = process_dataset(data, batch_size=50)
>>> print(result["processed"])
1
Note:
Large datasets are processed in batches to manage memory.
Progress is logged every 1000 records.
"""
if not data:
raise ValueError("Empty dataset provided")
# Project Name
Brief description of the project.
## Features
- Feature 1: Description
- Feature 2: Description
- Feature 3: Description
## Installation
```bash
pip install project-name
from project_name import MainClass
instance = MainClass()
result = instance.process()
Describe configuration options and environment variables.
Link to detailed API documentation.
# Clone repository
git clone https://github.com/user/project-name
cd project-name
# Setup virtual environment
rye sync
rye install --dev
pytest tests/
Describe contribution guidelines.
Specify license information.
### API Documentation (docs/api.md)
```markdown
# API Reference
## DataProcessor
### Constructor
```python
DataProcessor(config: dict[str, any]) -> None
Initialize a new data processor with configuration.
- config (dict): Configuration dictionary containing:
- mode (str): Processing mode ("strict" or "lenient")
- validate (bool): Whether to validate input
- max_size (int, optional): Maximum input size
- ValueError: If config is invalid
- KeyError: If required config keys are missing
def process_data(data: dict[str, any]) -> dict[str, any]
Process a single data record.
- data (dict): Data record to process
Dictionary containing processing results
- ValidationError: If data validation fails
## Code Examples Documentation
### Examples Directory Structure
examples/ βββ README.md βββ basic_usage/ β βββ init.py β βββ simple_processing.py βββ advanced_usage/ β βββ init.py β βββ batch_processing.py βββ tutorials/ βββ 01_getting_started.md βββ 02_configuration.md βββ 03_advanced_features.md
### Example Template
```python
"""
Example demonstrating basic data processing.
This example shows how to:
1. Configure the processor
2. Process single records
3. Handle errors
"""
from project_name import DataProcessor
def main() -> None:
"""Run the example."""
# Configuration
config = {
"mode": "strict",
"validate": True
}
# Initialize processor
processor = DataProcessor(config)
# Process data
data = {"id": 1, "value": "test"}
try:
result = processor.process_data(data)
print(f"Processed: {result}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
"""Test module for data processing functionality."""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, Generator, List
import pytest
from pytest_mock import MockerFixture
if TYPE_CHECKING:
from _pytest.capture import CaptureFixture
from _pytest.fixtures import FixtureRequest
from _pytest.logging import LogCaptureFixture
from _pytest.monkeypatch import MonkeyPatch
# Constants for testing
TEST_DATA_DIR = Path("tests/fixtures")
SAMPLE_CONFIG = {
"mode": "strict",
"validate": True,
}
@pytest.mark.cursorgenerated
def test_basic_processing(
sample_data: Dict[str, Any],
mocker: MockerFixture,
caplog: LogCaptureFixture,
) -> None:
"""
Test basic data processing functionality.
Args:
sample_data: Fixture providing test data
mocker: Pytest mocker fixture
caplog: Fixture for capturing log output
"""
processor = DataProcessor(SAMPLE_CONFIG)
result = processor.process_data(sample_data)
assert result["status"] == "success"
# tests/conftest.py
from typing import Generator, Any
import pytest
from pathlib import Path
@pytest.fixture
def sample_config() -> Dict[str, Any]:
"""
Provide sample configuration for testing.
Returns:
Dictionary containing test configuration
"""
return {
"mode": "test",
"validate": True,
"max_size": 1000
}
@pytest.fixture
def mock_file(tmp_path: Path) -> Path:
"""
Create a mock file for testing.
Args:
tmp_path: Pytest fixture providing temporary directory
Returns:
Path to mock file
"""
file_path = tmp_path / "test_file.txt"
file_path.write_text("test content")
return file_path
@pytest.fixture
async def mock_api() -> AsyncGenerator[MockAPI, None]:
"""
Provide mock API for testing.
Yields:
MockAPI instance
Example:
async def test_api(mock_api):
result = await mock_api.get_data()
assert result["status"] == "success"
"""
api = MockAPI()
await api.start()
yield api
await api.shutdown()
# tests/resources.py
class TestResources:
"""Manage test resources and cleanup."""
@staticmethod
@pytest.fixture(scope="session")
def database() -> Generator[Database, None, None]:
"""
Provide test database connection.
Yields:
Database connection
"""
db = Database("test://localhost/testdb")
db.connect()
yield db
db.disconnect()
@staticmethod
@pytest.fixture
def clean_database(database: Database) -> Generator[Database, None, None]:
"""
Provide clean database for each test.
Args:
database: Database fixture
Yields:
Clean database connection
"""
database.clear_all()
yield database
database.clear_all()
# pytest.ini or pyproject.toml
[tool.pytest.ini_options]
markers = [
"unit: mark test as unit test",
"integration: mark test as integration test",
"slow: mark test as slow running",
"cursorgenerated: mark tests generated by AI assistant",
"smoke: mark test as smoke test",
"security: mark test as security test"
]
# Usage in tests
@pytest.mark.unit
def test_validation() -> None:
"""Unit test for validation logic."""
pass
@pytest.mark.integration
def test_database_integration(database: Database) -> None:
"""Integration test for database operations."""
pass
@pytest.mark.slow
def test_large_dataset_processing() -> None:
"""Performance test for large datasets."""
pass
@pytest.mark.parametrize(
"input_data,expected",
[
({"value": 1}, True),
({"value": -1}, False),
({"value": 0}, True),
],
ids=["positive", "negative", "zero"]
)
def test_validation_cases(
input_data: Dict[str, Any],
expected: bool
) -> None:
"""
Test validation with different input cases.
Args:
input_data: Test input data
expected: Expected validation result
"""
validator = Validator()
assert validator.is_valid(input_data) == expected
def test_invalid_input() -> None:
"""Test handling of invalid input."""
processor = DataProcessor({})
with pytest.raises(ValueError) as excinfo:
processor.process(None)
assert "Input cannot be None" in str(excinfo.value)
@pytest.mark.asyncio
async def test_async_processing(
mock_api: MockAPI,
sample_data: Dict[str, Any]
) -> None:
"""
Test asynchronous processing.
Args:
mock_api: Mock API fixture
sample_data: Sample test data
"""
processor = AsyncProcessor(mock_api)
result = await processor.process_async(sample_data)
assert result["status"] == "success"
def test_api_integration(mocker: MockerFixture) -> None:
"""Test API integration with mocked responses."""
mock_response = mocker.Mock()
mock_response.json.return_value = {"status": "success"}
mock_response.status_code = 200
mock_get = mocker.patch("requests.get", return_value=mock_response)
client = APIClient()
result = client.fetch_data()
assert result["status"] == "success"
mock_get.assert_called_once()
@pytest.mark.vcr(
allow_playback_repeats=True,
match_on=["method", "scheme", "port", "path", "query"],
ignore_localhost=False
)
def test_external_api() -> None:
"""Test external API integration with VCR."""
client = APIClient()
result = client.fetch_external_data()
assert result["status"] == "success"
# pyproject.toml
[tool.coverage.run]
branch = true
source = ["src"]
omit = [
"tests/*",
"**/__init__.py"
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:"
]
fail_under = 90
show_missing = true
"""Module implementing secure coding patterns."""
from __future__ import annotations
import hmac
import secrets
from base64 import b64encode
from hashlib import sha256
from typing import Any, Optional
from cryptography.fernet import Fernet
from pydantic import BaseModel, SecretStr
class SecureConfig(BaseModel):
"""Secure configuration handling."""
# Use SecretStr for sensitive data
api_key: SecretStr
database_url: SecretStr
secret_key: SecretStr
class Config:
"""Pydantic model configuration."""
json_encoders = {
SecretStr: lambda v: v.get_secret_value() if v else None
}
def generate_secure_token(length: int = 32) -> str:
"""
Generate cryptographically secure token.
Args:
length: Length of token to generate
Returns:
Secure random token
Example:
>>> token = generate_secure_token()
>>> len(token) == 64 # hex string length
True
"""
return secrets.token_hex(length)
def verify_signature(
payload: str,
signature: str,
secret_key: str,
*,
encoding: str = 'utf-8'
) -> bool:
"""
Verify message signature using HMAC.
Args:
payload: Original message
signature: Message signature
secret_key: Secret key for verification
encoding: String encoding to use
Returns:
True if signature is valid
"""
expected = hmac.new(
secret_key.encode(encoding),
payload.encode(encoding),
sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
class EncryptionService:
"""Service for data encryption operations."""
def __init__(self, encryption_key: bytes) -> None:
"""
Initialize encryption service.
Args:
encryption_key: Key for encryption/decryption
Raises:
ValueError: If key is invalid
"""
self._fernet = Fernet(encryption_key)
@classmethod
def generate_key(cls) -> bytes:
"""
Generate new encryption key.
Returns:
New Fernet encryption key
"""
return Fernet.generate_key()
def encrypt_data(self, data: str) -> str:
"""
Encrypt string data.
Args:
data: Data to encrypt
Returns:
Encrypted data as base64 string
"""
encrypted = self._fernet.encrypt(data.encode())
return b64encode(encrypted).decode()
def decrypt_data(self, encrypted_data: str) -> str:
"""
Decrypt encrypted data.
Args:
encrypted_data: Base64 encoded encrypted data
Returns:
Decrypted string
Raises:
InvalidToken: If decryption fails
"""
try:
decrypted = self._fernet.decrypt(encrypted_data)
return decrypted.decode()
except Exception as e:
raise SecurityError(f"Decryption failed: {e}")
from typing import Any, Dict
from pydantic import BaseModel, EmailStr, validator
import re
class UserInput(BaseModel):
"""Secure user input validation."""
username: str
email: EmailStr
password: str
@validator('username')
def validate_username(cls, v: str) -> str:
"""
Validate username format.
Args:
v: Username to validate
Returns:
Validated username
Raises:
ValueError: If username is invalid
"""
if not re.match(r'^[a-zA-Z0-9_]{3,20}$', v):
raise ValueError(
'Username must be 3-20 characters long and contain '
'only letters, numbers, and underscores'
)
return v
@validator('password')
def validate_password(cls, v: str) -> str:
"""
Validate password strength.
Args:
v: Password to validate
Returns:
Validated password
Raises:
ValueError: If password is too weak
"""
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain digit')
return v
from typing import Any, List, Tuple
from sqlalchemy import text
from sqlalchemy.engine import Engine
class SecureQueryBuilder:
"""Secure SQL query builder."""
def __init__(self, engine: Engine) -> None:
"""
Initialize query builder.
Args:
engine: SQLAlchemy engine
"""
self.engine = engine
def execute_query(
self,
query: str,
params: Dict[str, Any]
) -> List[Tuple[Any, ...]]:
"""
Execute SQL query safely.
Args:
query: SQL query with parameter placeholders
params: Query parameters
Returns:
Query results
Example:
>>> builder = SecureQueryBuilder(engine)
>>> results = builder.execute_query(
... "SELECT * FROM users WHERE id = :user_id",
... {"user_id": 123}
... )
"""
with self.engine.connect() as conn:
result = conn.execute(text(query), params)
return result.fetchall()
from datetime import datetime, timedelta
import jwt
from typing import Optional, Dict, Any
class TokenManager:
"""Secure token management."""
def __init__(self, secret_key: str, algorithm: str = 'HS256') -> None:
"""
Initialize token manager.
Args:
secret_key: Key for token signing
algorithm: JWT algorithm to use
"""
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(
self,
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create secure JWT token.
Args:
data: Data to encode in token
expires_delta: Optional expiration time
Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(
to_encode,
self.secret_key,
algorithm=self.algorithm
)
def verify_token(self, token: str) -> Dict[str, Any]:
"""
Verify and decode JWT token.
Args:
token: Token to verify
Returns:
Decoded token data
Raises:
jwt.InvalidTokenError: If token is invalid
"""
return jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
from pydantic_settings import BaseSettings
from typing import Optional
class SecuritySettings(BaseSettings):
"""Secure application settings."""
# API Security
API_KEY: SecretStr
JWT_SECRET_KEY: SecretStr
JWT_ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Database Security
DATABASE_URL: SecretStr
DATABASE_POOL_SIZE: int = 5
DATABASE_MAX_OVERFLOW: int = 10
# SSL/TLS Configuration
SSL_CERT_FILE: Optional[Path] = None
SSL_KEY_FILE: Optional[Path] = None
# Rate Limiting
RATE_LIMIT_REQUESTS: int = 100
RATE_LIMIT_WINDOW_SECONDS: int = 60
class Config:
"""Settings configuration."""
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = True
"""Module for memory-efficient operations."""
from __future__ import annotations
from typing import Iterator, Generator, Any
from contextlib import contextmanager
import gc
import psutil
import resource
class MemoryManager:
"""Memory usage optimization and monitoring."""
@staticmethod
def get_memory_usage() -> float:
"""
Get current memory usage in MB.
Returns:
Current memory usage
"""
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
@staticmethod
@contextmanager
def track_memory() -> Generator[None, None, None]:
"""
Track memory usage within a context.
Example:
>>> with MemoryManager.track_memory():
... process_large_dataset()
"""
gc.collect()
start_mem = MemoryManager.get_memory_usage()
try:
yield
finally:
gc.collect()
end_mem = MemoryManager.get_memory_usage()
print(f"Memory change: {end_mem - start_mem:.2f}MB")
def process_large_dataset(
filename: str,
chunk_size: int = 1000
) -> Iterator[list[Any]]:
"""
Process large datasets in chunks to manage memory.
Args:
filename: File to process
chunk_size: Size of each chunk
Yields:
Processed data chunks
"""
with open(filename, 'r') as f:
chunk = []
for i, line in enumerate(f, 1):
chunk.append(process_line(line))
if i % chunk_size == 0:
yield chunk
chunk = []
if chunk:
yield chunk
"""CPU performance optimization patterns."""
from functools import lru_cache
import time
from typing import Any, Callable, TypeVar
T = TypeVar('T')
def memoize(func: Callable[..., T]) -> Callable[..., T]:
"""
Memoization decorator for expensive functions.
Args:
func: Function to memoize
Returns:
Memoized function
Example:
>>> @memoize
... def fibonacci(n: int) -> int:
... if n < 2: return n
... return fibonacci(n-1) + fibonacci(n-2)
"""
cache: dict = {}
def wrapper(*args: Any, **kwargs: Any) -> T:
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
return wrapper
class PerformanceMonitor:
"""Monitor and optimize performance."""
@staticmethod
@contextmanager
def timer(name: str) -> Generator[None, None, None]:
"""
Time execution of a code block.
Args:
name: Name of the operation
Example:
>>> with PerformanceMonitor.timer("data_processing"):
... process_data()
"""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{name} took {elapsed:.2f} seconds")
@staticmethod
def profile_function(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Profile function execution time.
Args:
func: Function to profile
Returns:
Profiled function
"""
def wrapper(*args: Any, **kwargs: Any) -> Any:
with PerformanceMonitor.timer(func.__name__):
return func(*args, **kwargs)
return wrapper
from multiprocessing import Pool, cpu_count
from typing import Callable, List, TypeVar, Sequence
T = TypeVar('T')
U = TypeVar('U')
class ParallelProcessor:
"""Parallel processing utility."""
def __init__(
self,
num_processes: Optional[int] = None,
maxtasksperchild: Optional[int] = None
) -> None:
"""
Initialize parallel processor.
Args:
num_processes: Number of processes to use
maxtasksperchild: Max tasks per child process
"""
self.num_processes = num_processes or cpu_count()
self.maxtasksperchild = maxtasksperchild
def map(
self,
func: Callable[[T], U],
items: Sequence[T]
) -> List[U]:
"""
Process items in parallel.
Args:
func: Function to apply
items: Items to process
Returns:
Processed results
Example:
>>> processor = ParallelProcessor()
>>> results = processor.map(str.upper, ['a', 'b', 'c'])
"""
with Pool(
processes=self.num_processes,
maxtasksperchild=self.maxtasksperchild
) as pool:
return pool.map(func, items)
import asyncio
from typing import List, TypeVar, Sequence, Callable, Awaitable
T = TypeVar('T')
U = TypeVar('U')
class AsyncProcessor:
"""Asynchronous processing utility."""
def __init__(self, batch_size: int = 10) -> None:
"""
Initialize async processor.
Args:
batch_size: Size of processing batches
"""
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(batch_size)
async def process_batch(
self,
func: Callable[[T], Awaitable[U]],
items: Sequence[T]
) -> List[U]:
"""
Process items asynchronously in batches.
Args:
func: Async function to apply
items: Items to process
Returns:
Processed results
"""
async def process_item(item: T) -> U:
async with self.semaphore:
return await func(item)
tasks = [process_item(item) for item in items]
return await asyncio.gather(*tasks)
from contextlib import contextmanager
from typing import Generator
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
class DatabaseManager:
"""Database connection and pooling manager."""
def __init__(
self,
url: str,
pool_size: int = 5,
max_overflow: int = 10,
pool_timeout: int = 30
) -> None:
"""
Initialize database manager.
Args:
url: Database URL
pool_size: Connection pool size
max_overflow: Maximum overflow connections
pool_timeout: Pool timeout in seconds
"""
self.engine = create_engine(
url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_timeout=pool_timeout
)
self.Session = sessionmaker(bind=self.engine)
@contextmanager
def session_scope(self) -> Generator[Session, None, None]:
"""
Provide transactional scope around operations.
Yields:
Database session
Example:
>>> with db.session_scope() as session:
... session.query(User).all()
"""
session = self.Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
from sqlalchemy import text
from typing import List, Dict, Any
class QueryOptimizer:
"""SQL query optimization utilities."""
@staticmethod
def optimize_query(query: str) -> str:
"""
Optimize SQL query.
Args:
query: SQL query to optimize
Returns:
Optimized query
"""
# Add query optimization logic
return query
@staticmethod
def batch_insert(
session: Session,
table: str,
items: List[Dict[str, Any]],
batch_size: int = 1000
) -> None:
"""
Perform batch insert operations.
Args:
session: Database session
table: Target table
items: Items to insert
batch_size: Size of each batch
"""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
session.execute(
text(f"INSERT INTO {table} VALUES :values"),
[{"values": item} for item in batch]
)
session.commit()
"""RESTful API implementation patterns."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel, Field
from enum import Enum
class SortOrder(str, Enum):
"""Sort order enumeration."""
ASC = "asc"
DESC = "desc"
class PaginationParams(BaseModel):
"""Pagination parameters."""
page: int = Field(default=1, ge=1)
limit: int = Field(default=10, ge=1, le=100)
sort_by: Optional[str] = None
sort_order: SortOrder = SortOrder.ASC
class APIResponse(BaseModel):
"""Standard API response model."""
success: bool
data: Optional[Any] = None
error: Optional[str] = None
meta: Optional[Dict[str, Any]] = None
class BaseAPIController:
"""Base controller for API endpoints."""
def __init__(self, service: Any) -> None:
"""
Initialize controller with service.
Args:
service: Service layer instance
"""
self.service = service
async def paginate(
self,
items: List[Any],
params: PaginationParams
) -> Dict[str, Any]:
"""
Paginate items with metadata.
Args:
items: Items to paginate
params: Pagination parameters
Returns:
Paginated results with metadata
"""
start = (params.page - 1) * params.limit
end = start + params.limit
if params.sort_by:
items.sort(
key=lambda x: getattr(x, params.sort_by),
reverse=params.sort_order == SortOrder.DESC
)
return {
"items": items[start:end],
"meta": {
"total": len(items),
"page": params.page,
"limit": params.limit,
"pages": (len(items) + params.limit - 1) // params.limit
}
}
from fastapi import APIRouter
from typing import Generic, TypeVar
T = TypeVar('T', bound=BaseModel)
class CRUDRouter(Generic[T]):
"""Generic CRUD router implementation."""
def __init__(
self,
model: type[T],
prefix: str,
tags: List[str]
) -> None:
"""
Initialize CRUD router.
Args:
model: Pydantic model class
prefix: URL prefix
tags: API tags
"""
self.router = APIRouter(prefix=prefix, tags=tags)
self.model = model
self._register_routes()
def _register_routes(self) -> None:
"""Register CRUD routes."""
@self.router.get("/")
async def list_items(
pagination: PaginationParams = Depends()
) -> APIResponse:
"""List items with pagination."""
try:
items = await self.service.list_items()
paginated = await self.paginate(items, pagination)
return APIResponse(
success=True,
data=paginated["items"],
meta=paginated["meta"]
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
@self.router.get("/{item_id}")
async def get_item(item_id: int) -> APIResponse:
"""Get single item by ID."""
try:
item = await self.service.get_item(item_id)
if not item:
raise HTTPException(
status_code=404,
detail="Item not found"
)
return APIResponse(success=True, data=item)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
import strawberry
from typing import List
from datetime import datetime
@strawberry.type
class User:
"""User type for GraphQL schema."""
id: int
username: str
email: str
created_at: datetime
@strawberry.field
async def posts(self) -> List['Post']:
"""Get user's posts."""
return await get_user_posts(self.id)
@strawberry.type
class Query:
"""Root query type."""
@strawberry.field
async def user(self, id: int) -> Optional[User]:
"""Get user by ID."""
return await get_user(id)
@strawberry.field
async def users(
self,
limit: int = 10,
offset: int = 0
) -> List[User]:
"""Get paginated users."""
return await get_users(limit, offset)
schema = strawberry.Schema(query=Query)
from fastapi import FastAPI, APIRouter
from enum import Enum
class APIVersion(str, Enum):
"""API version enumeration."""
V1 = "v1"
V2 = "v2"
class VersionedAPI:
"""Versioned API manager."""
def __init__(self) -> None:
"""Initialize versioned API."""
self.app = FastAPI()
self.routers: Dict[APIVersion, APIRouter] = {}
for version in APIVersion:
self.routers[version] = APIRouter(
prefix=f"/api/{version}"
)
def include_router(
self,
router: APIRouter,
version: APIVersion
) -> None:
"""
Include router for specific version.
Args:
router: Router to include
version: API version
"""
self.routers[version].include_router(router)
self.app.include_router(self.routers[version])
from fastapi import Request, HTTPException
import time
from collections import defaultdict
from typing import DefaultDict, Tuple
class RateLimiter:
"""API rate limiting implementation."""
def __init__(
self,
requests: int = 100,
window: int = 60
) -> None:
"""
Initialize rate limiter.
Args:
requests: Maximum requests per window
window: Time window in seconds
"""
self.requests = requests
self.window = window
self.clients: DefaultDict[
str, List[float]
] = defaultdict(list)
async def check_rate_limit(
self,
request: Request
) -> None:
"""
Check rate limit for request.
Args:
request: FastAPI request
Raises:
HTTPException: If rate limit exceeded
"""
client = request.client.host
now = time.time()
# Clean old requests
self.clients[client] = [
timestamp
for timestamp in self.clients[client]
if now - timestamp < self.window
]
if len(self.clients[client]) >= self.requests:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
self.clients[client].append(now)
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
def custom_openapi(app: FastAPI) -> Dict[str, Any]:
"""
Customize OpenAPI documentation.
Args:
app: FastAPI application
Returns:
CustomizedOpenAPI schema
"""
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="Your API",
version="1.0.0",
description="API documentation",
routes=app.routes,
)
openapi_schema["info"]["x-logo"] = {
"url": "https://your-logo-url.com/logo.png"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app = FastAPI()
app.openapi = lambda: custom_openapi(app)
# Multi-stage build
FROM python:3.9-slim as builder
# Set working directory
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Rye
RUN curl -sSf https://rye-up.com/get | bash
ENV PATH="/root/.rye/shims:${PATH}"
# Copy dependency files
COPY pyproject.toml .
COPY requirements.lock .
COPY requirements-dev.lock .
# Install dependencies
RUN rye sync --no-dev
RUN rye build
# Production image
FROM python:3.9-slim
WORKDIR /app
# Copy built packages from builder
COPY --from=builder /app/dist /app/dist
COPY --from=builder /app/.venv /app/.venv
# Copy application code
COPY src/ src/
COPY alembic/ alembic/
COPY alembic.ini .
# Environment variables
ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1
# Run with production server
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
app:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/dbname
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
db:
image: postgres:13
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=dbname
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d dbname"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:6
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
volumes:
postgres_data:
redis_data:
# kubernetes/base/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-deployment
spec:
replicas: 3
selector:
matchLabels:
app: python-app
template:
metadata:
labels:
app: python-app
spec:
containers:
- name: python-app
image: your-registry/app:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
resources:
requests:
cpu: "100m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 15
periodSeconds: 20
# kubernetes/base/service.yaml
apiVersion: v1
kind: Service
metadata:
name: app-service
spec:
selector:
app: python-app
ports:
- port: 80
targetPort: 8000
type: ClusterIP
# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install Rye
run: curl -sSf https://rye-up.com/get | bash
- name: Install dependencies
run: |
source $HOME/.rye/env
rye sync --all-features
- name: Run tests
run: |
source $HOME/.rye/env
rye run pytest
- name: Build and push Docker image
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
run: |
docker build -t ${{ secrets.REGISTRY }}/app:${{ github.sha }} .
docker push ${{ secrets.REGISTRY }}/app:${{ github.sha }}
deploy:
needs: test
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Deploy to Kubernetes
uses: azure/k8s-deploy@v1
with:
manifests: |
kubernetes/production/*.yaml
# src/config/environment.py
from pydantic_settings import BaseSettings
from typing import Optional
from enum import Enum
class Environment(str, Enum):
"""Environment enumeration."""
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class DeploymentConfig(BaseSettings):
"""Deployment configuration."""
# Environment
ENVIRONMENT: Environment = Environment.DEVELOPMENT
DEBUG: bool = False
# Application
APP_NAME: str = "python-app"
APP_VERSION: str = "1.0.0"
# Server
HOST: str = "0.0.0.0"
PORT: int = 8000
WORKERS: int = 4
# Database
DATABASE_URL: str
DATABASE_POOL_SIZE: int = 5
DATABASE_POOL_RECYCLE: int = 3600
# Redis
REDIS_URL: Optional[str] = None
# Monitoring
SENTRY_DSN: Optional[str] = None
PROMETHEUS_ENABLED: bool = False
class Config:
"""Configuration settings."""
env_file = ".env"
env_file_encoding = "utf-8"
# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Info
from functools import wraps
import time
# Metrics definition
REQUEST_COUNT = Counter(
'app_request_count',
'Application Request Count',
['method', 'endpoint', 'http_status']
)
REQUEST_LATENCY = Histogram(
'app_request_latency_seconds',
'Application Request Latency',
['method', 'endpoint']
)
def track_request_metrics(func):
"""Track request metrics decorator."""
@wraps(func)
async def wrapper(*args, **kwargs):
method = kwargs.get('method', 'UNKNOWN')
endpoint = kwargs.get('endpoint', 'UNKNOWN')
start_time = time.time()
try:
response = await func(*args, **kwargs)
status = response.status_code
except Exception as e:
status = 500
raise e
finally:
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
http_status=status
).inc()
REQUEST_LATENCY.labels(
method=method,
endpoint=endpoint
).observe(duration)
return response
return wrapper
# pyproject.toml
[project]
name = "your-project"
version = "0.1.0"
requires-python = ">=3.9"
dependencies = [
"fastapi>=0.68.0",
"sqlalchemy>=1.4.0",
"pydantic>=2.0.0",
"alembic>=1.7.0",
]
[tool.rye]
managed = true
dev-dependencies = [
"pytest>=7.0.0",
"pytest-asyncio>=0.18.0",
"pytest-cov>=3.0.0",
"black>=22.3.0",
"ruff>=0.1.0",
]
[tool.rye.scripts]
test = "pytest tests/"
lint = "ruff check ."
format = "ruff format ."
"""Version management utilities."""
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Dict, Set, Optional
import toml
class DependencyManager:
"""Manage project dependencies."""
def __init__(self, project_root: Path) -> None:
"""
Initialize dependency manager.
Args:
project_root: Project root directory
"""
self.project_root = project_root
self.pyproject_path = project_root / "pyproject.toml"
self.requirements_path = project_root / "requirements.lock"
def get_dependencies(self) -> Dict[str, str]:
"""
Get project dependencies.
Returns:
Dictionary of dependencies and versions
"""
with open(self.pyproject_path) as f:
pyproject = toml.load(f)
return {
dep.split(">=")[0]: dep.split(">=")[1]
for dep in pyproject["project"]["dependencies"]
}
def check_updates(self) -> Dict[str, Dict[str, str]]:
"""
Check for dependency updates.
Returns:
Dictionary of available updates
"""
updates = {}
for pkg, version in self.get_dependencies().items():
result = subprocess.run(
["rye", "search", pkg],
capture_output=True,
text=True
)
latest = result.stdout.strip().split("\n")[-1].split()[-1]
if latest != version:
updates[pkg] = {
"current": version,
"latest": latest
}
return updates
def update_dependencies(self, packages: Optional[Set[str]] = None) -> None:
"""
Update project dependencies.
Args:
packages: Specific packages to update
"""
cmd = ["rye", "sync", "--update"]
if packages:
cmd.extend(packages)
subprocess.run(cmd, check=True)
"""Virtual environment management utilities."""
import os
import subprocess
import venv
from pathlib import Path
from typing import Optional
class VenvManager:
"""Manage virtual environments."""
def __init__(self, project_root: Path) -> None:
"""
Initialize virtual environment manager.
Args:
project_root: Project root directory
"""
self.project_root = project_root
self.venv_path = project_root / ".venv"
def create_venv(self) -> None:
"""Create virtual environment."""
venv.create(
self.venv_path,
system_site_packages=False,
with_pip=True,
upgrade_deps=True
)
def activate_venv(self) -> None:
"""Activate virtual environment."""
if os.name == "nt": # Windows
activate_script = self.venv_path / "Scripts" / "activate.bat"
else: # Unix/Linux
activate_script = self.venv_path / "bin" / "activate"
if not activate_script.exists():
raise EnvironmentError("Virtual environment not found")
os.environ["VIRTUAL_ENV"] = str(self.venv_path)
os.environ["PATH"] = f"{self.venv_path}/bin:{os.environ['PATH']}"
def install_dependencies(
self,
dev: bool = False,
upgrade: bool = False
) -> None:
"""
Install project dependencies.
Args:
dev: Install development dependencies
upgrade: Upgrade existing packages
"""
cmd = ["rye", "sync"]
if dev:
cmd.append("--all-features")
if upgrade:
cmd.append("--update")
subprocess.run(cmd, check=True)
"""Lock file management utilities."""
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set
import toml
class LockFileManager:
"""Manage dependency lock files."""
def __init__(self, project_root: Path) -> None:
"""
Initialize lock file manager.
Args:
project_root: Project root directory
"""
self.project_root = project_root
self.lock_file = project_root / "requirements.lock"
def generate_lock_file(self) -> None:
"""Generate lock file from current environment."""
subprocess.run(
["rye", "lock"],
check=True
)
def verify_dependencies(self) -> bool:
"""
Verify installed dependencies match lock file.
Returns:
True if dependencies match
"""
try:
subprocess.run(
["rye", "sync", "--check"],
check=True
)
return True
except subprocess.CalledProcessError:
return False
def get_dependency_tree(self) -> Dict[str, List[str]]:
"""
Get dependency tree.
Returns:
Dictionary representing dependency tree
"""
result = subprocess.run(
["rye", "deps", "--tree"],
capture_output=True,
text=True
)
# Parse and return dependency tree
return self._parse_dependency_tree(result.stdout)
def _parse_dependency_tree(self, output: str) -> Dict[str, List[str]]:
"""Parse dependency tree output."""
tree: Dict[str, List[str]] = {}
current_pkg = None
for line in output.split("\n"):
if not line.startswith(" "):
current_pkg = line.split("[")[0].strip()
tree[current_pkg] = []
elif current_pkg and line.strip():
tree[current_pkg].append(
line.strip().split("[")[0].strip()
)
return tree
"""Requirements file generation utilities."""
from pathlib import Path
from typing import Set
import subprocess
class RequirementsManager:
"""Manage requirements files."""
def __init__(self, project_root: Path) -> None:
"""
Initialize requirements manager.
Args:
project_root: Project root directory
"""
self.project_root = project_root
self.requirements_dir = project_root / "requirements"
self.requirements_dir.mkdir(exist_ok=True)
def generate_requirements(self) -> None:
"""Generate requirements files."""
# Base requirements
subprocess.run(
["rye", "export", "-o", "requirements/base.txt"],
check=True
)
# Development requirements
subprocess.run(
["rye", "export", "--dev", "-o", "requirements/dev.txt"],
check=True
)
# Production requirements
subprocess.run(
[
"rye", "export",
"--no-dev",
"--production",
"-o", "requirements/production.txt"
],
check=True
)
def check_security(self) -> Set[str]:
"""
Check for security vulnerabilities.
Returns:
Set of vulnerable packages
"""
result = subprocess.run(
["safety", "check"],
capture_output=True,
text=True
)
return {
line.split("[")[0].strip()
for line in result.stdout.split("\n")
if "[-]" in line
}
"""Core logging configuration and utilities."""
from __future__ import annotations
import sys
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional, Union
from loguru import logger
from pydantic import BaseModel
class LogLevel(str, Enum):
"""Log level enumeration."""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class LogConfig(BaseModel):
"""Logging configuration."""
# General settings
level: LogLevel = LogLevel.INFO
format: str = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<level>{message}</level> | "
"{extra}"
)
# File settings
log_dir: Path = Path("logs")
rotation: str = "500 MB"
retention: str = "10 days"
compression: str = "zip"
class LoggerManager:
"""Manage application logging."""
def __init__(self, config: LogConfig) -> None:
"""
Initialize logger manager.
Args:
config: Logging configuration
"""
self.config = config
self._configure_logger()
def _configure_logger(self) -> None:
"""Configure loguru logger."""
# Remove default handler
logger.remove()
# Configure console handler
logger.add(
sys.stderr,
format=self.config.format,
level=self.config.level.value,
backtrace=True,
diagnose=True
)
# Configure file handler
self.config.log_dir.mkdir(exist_ok=True)
logger.add(
self.config.log_dir / "app.log",
format=self.config.format,
level=self.config.level.value,
rotation=self.config.rotation,
retention=self.config.retention,
compression=self.config.compression,
backtrace=True,
diagnose=True
)
@staticmethod
def bind_context(**kwargs: Any) -> None:
"""
Bind context to logger.
Args:
**kwargs: Context key-value pairs
"""
logger.configure(extra=kwargs)
"""Contextual logging utilities."""
from contextvars import ContextVar
from functools import wraps
from typing import Any, Callable, Optional
from uuid import uuid4
request_id: ContextVar[str] = ContextVar('request_id', default='')
def with_logger_context(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Add context to logger for function execution.
Args:
func: Function to wrap
Returns:
Wrapped function with logging context
"""
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
context_id = str(uuid4())
request_id.set(context_id)
logger.bind(request_id=context_id)
try:
return await func(*args, **kwargs)
finally:
logger.bind(request_id=None)
return wrapper
class RequestLogger:
"""Request logging utility."""
@staticmethod
async def log_request(
request: Any,
response: Any,
execution_time: float
) -> None:
"""
Log request details.
Args:
request: Request object
response: Response object
execution_time: Request execution time
"""
logger.info(
"Request processed",
extra={
"request_id": request_id.get(),
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"execution_time": execution_time,
"client_ip": request.client.host
}
)
"""Prometheus metrics collection."""
from prometheus_client import (
Counter,
Gauge,
Histogram,
Summary,
CollectorRegistry
)
class MetricsCollector:
"""Collect and manage application metrics."""
def __init__(self) -> None:
"""Initialize metrics collector."""
self.registry = CollectorRegistry()
self._setup_metrics()
def _setup_metrics(self) -> None:
"""Setup metric collectors."""
# Request metrics
self.request_count = Counter(
'app_request_total',
'Total request count',
['method', 'endpoint', 'status'],
registry=self.registry
)
self.request_latency = Histogram(
'app_request_latency_seconds',
'Request latency in seconds',
['method', 'endpoint'],
registry=self.registry
)
# Business metrics
self.active_users = Gauge(
'app_active_users',
'Number of active users',
registry=self.registry
)
self.task_processing_time = Summary(
'app_task_processing_seconds',
'Task processing time in seconds',
['task_type'],
registry=self.registry
)
def track_request(
self,
method: str,
endpoint: str,
status: int,
duration: float
) -> None:
"""
Track request metrics.
Args:
method: HTTP method
endpoint: Request endpoint
status: Response status code
duration: Request duration
"""
self.request_count.labels(
method=method,
endpoint=endpoint,
status=status
).inc()
self.request_latency.labels(
method=method,
endpoint=endpoint
).observe(duration)
"""Application health monitoring."""
from datetime import datetime
from typing import Dict, List, Optional
import psutil
class HealthCheck:
"""Application health monitoring."""
def __init__(self) -> None:
"""Initialize health checker."""
self.start_time = datetime.now()
def check_system_health(self) -> Dict[str, Any]:
"""
Check system health metrics.
Returns:
Dictionary of health metrics
"""
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage("/").percent,
"uptime": (datetime.now() - self.start_time).total_seconds()
}
async def check_database(self) -> bool:
"""
Check database connectivity.
Returns:
True if database is healthy
"""
try:
await db.execute("SELECT 1")
return True
except Exception as e:
logger.error(f"Database health check failed: {e}")
return False
async def check_dependencies(self) -> Dict[str, bool]:
"""
Check external dependencies.
Returns:
Dictionary of dependency statuses
"""
checks = {
"redis": await self._check_redis(),
"cache": await self._check_cache(),
"external_api": await self._check_external_api()
}
return checks
"""Application performance monitoring."""
import cProfile
import pstats
from functools import wraps
from typing import Any, Callable, Optional
import time
class PerformanceMonitor:
"""Monitor application performance."""
def __init__(self, threshold_ms: float = 100) -> None:
"""
Initialize performance monitor.
Args:
threshold_ms: Slow operation threshold
"""
self.threshold_ms = threshold_ms
@staticmethod
def profile(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Profile function execution.
Args:
func: Function to profile
Returns:
Profiled function
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
profiler = cProfile.Profile()
try:
return profiler.runcall(func, *args, **kwargs)
finally:
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats()
return wrapper
def monitor_performance(
self,
operation: str
) -> Callable[..., Any]:
"""
Monitor operation performance.
Args:
operation: Operation name
Returns:
Monitoring decorator
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
start_time = time.perf_counter()
try:
result = await func(*args, **kwargs)
return result
finally:
duration_ms = (time.perf_counter() - start_time) * 1000
if duration_ms > self.threshold_ms:
logger.warning(
f"Slow operation detected",
extra={
"operation": operation,
"duration_ms": duration_ms
}
)
return wrapper
return decorator
"""Custom exception hierarchy."""
from __future__ import annotations
from typing import Any, Dict, Optional
from loguru import logger
class BaseError(Exception):
"""Base exception for all custom errors."""
def __init__(
self,
message: str,
*,
code: Optional[str] = None,
details: Optional[Dict[str, Any]] = None
) -> None:
"""
Initialize base error.
Args:
message: Error message
code: Error code
details: Additional error details
"""
super().__init__(message)
self.message = message
self.code = code or self.__class__.__name__
self.details = details or {}
self._log_error()
def _log_error(self) -> None:
"""Log error details."""
logger.error(
self.message,
error_code=self.code,
error_type=self.__class__.__name__,
**self.details
)
def to_dict(self) -> Dict[str, Any]:
"""
Convert error to dictionary.
Returns:
Dictionary representation of error
"""
return {
"error": self.code,
"message": self.message,
"details": self.details
}
class ValidationError(BaseError):
"""Validation error."""
pass
class DatabaseError(BaseError):
"""Database operation error."""
pass
class APIError(BaseError):
"""API-related error."""
pass
class ServiceError(BaseError):
"""Service operation error."""
pass
class ConfigurationError(BaseError):
"""Configuration-related error."""
pass
"""Retry mechanism implementation."""
from functools import wraps
from typing import Any, Callable, Optional, Type, Union, Tuple
import asyncio
import time
class RetryConfig:
"""Retry configuration."""
def __init__(
self,
max_retries: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
) -> None:
"""
Initialize retry configuration.
Args:
max_retries: Maximum retry attempts
delay: Initial delay between retries
backoff: Backoff multiplier
exceptions: Exceptions to retry on
"""
self.max_retries = max_retries
self.delay = delay
self.backoff = backoff
self.exceptions = exceptions
class RetryHandler:
"""Handle operation retries."""
@staticmethod
def with_retry(
config: Optional[RetryConfig] = None
) -> Callable[..., Any]:
"""
Retry decorator.
Args:
config: Retry configuration
Returns:
Decorated function
"""
config = config or RetryConfig()
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
last_exception: Optional[Exception] = None
delay = config.delay
for attempt in range(config.max_retries):
try:
return await func(*args, **kwargs)
except config.exceptions as e:
last_exception = e
if attempt < config.max_retries - 1:
logger.warning(
f"Retry attempt {attempt + 1} of {config.max_retries}",
error=str(e)
)
await asyncio.sleep(delay)
delay *= config.backoff
raise last_exception or ServiceError("Max retries exceeded")
@wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
last_exception: Optional[Exception] = None
delay = config.delay
for attempt in range(config.max_retries):
try:
return func(*args, **kwargs)
except config.exceptions as e:
last_exception = e
if attempt < config.max_retries - 1:
logger.warning(
f"Retry attempt {attempt + 1} of {config.max_retries}",
error=str(e)
)
time.sleep(delay)
delay *= config.backoff
raise last_exception or ServiceError("Max retries exceeded")
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
"""Circuit breaker implementation."""
from enum import Enum
from datetime import datetime, timedelta
from typing import Any, Callable, Optional
import asyncio
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker pattern implementation."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_timeout: int = 30
) -> None:
"""
Initialize circuit breaker.
Args:
failure_threshold: Failures before opening
recovery_timeout: Seconds before recovery attempt
half_open_timeout: Seconds in half-open state
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_timeout = half_open_timeout
self.failures = 0
self.last_failure_time: Optional[datetime] = None
self.state = CircuitState.CLOSED
async def call(
self,
func: Callable[..., Any],
*args: Any,
**kwargs: Any
) -> Any:
"""
Make protected call through circuit breaker.
Args:
func: Function to call
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
ServiceError: If circuit is open
"""
if await self._is_open():
raise ServiceError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise e
async def _is_open(self) -> bool:
"""Check if circuit is open."""
if self.state == CircuitState.OPEN:
if self.last_failure_time:
if (datetime.now() - self.last_failure_time
> timedelta(seconds=self.recovery_timeout)):
self.state = CircuitState.HALF_OPEN
return False
return True
return False
async def _on_success(self) -> None:
"""Handle successful call."""
self.failures = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
async def _on_failure(self) -> None:
"""Handle failed call."""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
"""Error reporting system."""
from typing import Any, Dict, Optional
import traceback
from datetime import datetime
class ErrorReport:
"""Error report structure."""
def __init__(
self,
error: Exception,
context: Optional[Dict[str, Any]] = None
) -> None:
"""
Initialize error report.
Args:
error: Exception to report
context: Additional context
"""
self.error = error
self.context = context or {}
self.timestamp = datetime.now()
self.traceback = traceback.format_exc()
def to_dict(self) -> Dict[str, Any]:
"""
Convert report to dictionary.
Returns:
Dictionary representation of report
"""
return {
"error_type": self.error.__class__.__name__,
"message": str(self.error),
"timestamp": self.timestamp.isoformat(),
"traceback": self.traceback,
"context": self.context
}
class ErrorReporter:
"""Handle error reporting to various backends."""
def __init__(self) -> None:
"""Initialize error reporter."""
self.handlers: List[ErrorHandler] = []
def add_handler(self, handler: ErrorHandler) -> None:
"""
Add error handler.
Args:
handler: Error handler to add
"""
self.handlers.append(handler)
async def report(
self,
error: Exception,
context: Optional[Dict[str, Any]] = None
) -> None:
"""
Report error to all handlers.
Args:
error: Exception to report
context: Additional context
"""
report = ErrorReport(error, context)
for handler in self.handlers:
try:
await handler.handle(report)
except Exception as e:
logger.error(
f"Error handler failed: {e}",
handler=handler.__class__.__name__
)
"""Git branch management utilities."""
from __future__ import annotations
import subprocess
from enum import Enum
from pathlib import Path
from typing import List, Optional
class BranchType(str, Enum):
"""Branch type enumeration."""
FEATURE = "feature"
BUGFIX = "bugfix"
HOTFIX = "hotfix"
RELEASE = "release"
MAIN = "main"
DEVELOP = "develop"
class GitFlow:
"""GitFlow workflow implementation."""
def __init__(self, repo_path: Path) -> None:
"""
Initialize GitFlow manager.
Args:
repo_path: Path to git repository
"""
self.repo_path = repo_path
def create_branch(
self,
branch_type: BranchType,
name: str,
from_branch: Optional[str] = None
) -> str:
"""
Create new branch following GitFlow patterns.
Args:
branch_type: Type of branch to create
name: Branch name
from_branch: Base branch
Returns:
Created branch name
"""
if branch_type == BranchType.MAIN:
raise ValueError("Cannot create main branch")
branch_name = f"{branch_type}/{name}"
base_branch = from_branch or self._get_base_branch(branch_type)
subprocess.run(
["git", "checkout", "-b", branch_name, base_branch],
check=True
)
return branch_name
def _get_base_branch(self, branch_type: BranchType) -> str:
"""Get base branch for branch type."""
if branch_type in {BranchType.FEATURE, BranchType.BUGFIX}:
return "develop"
if branch_type == BranchType.HOTFIX:
return "main"
if branch_type == BranchType.RELEASE:
return "develop"
return "main"
"""Commit message management."""
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class CommitMessage:
"""Structured commit message."""
type: str
scope: Optional[str]
subject: str
body: Optional[str]
breaking_change: Optional[str]
def format(self) -> str:
"""
Format commit message.
Returns:
Formatted commit message
Example:
feat(api): add user authentication
Implement JWT-based authentication for API endpoints.
BREAKING CHANGE: requires new environment variables
"""
header = f"{self.type}"
if self.scope:
header += f"({self.scope})"
header += f": {self.subject}"
message = [header]
if self.body:
message.extend(["", self.body])
if self.breaking_change:
message.extend(["", f"BREAKING CHANGE: {self.breaking_change}"])
return "\n".join(message)
class CommitValidator:
"""Validate commit messages."""
TYPES = {
"feat": "New feature",
"fix": "Bug fix",
"docs": "Documentation only changes",
"style": "Code style changes",
"refactor": "Code refactoring",
"perf": "Performance improvements",
"test": "Adding missing tests",
"chore": "Build process or auxiliary tool changes"
}
@classmethod
def validate(cls, message: str) -> bool:
"""
Validate commit message format.
Args:
message: Commit message to validate
Returns:
True if valid
"""
try:
commit = cls.parse(message)
return bool(
commit.type in cls.TYPES
and commit.subject
and len(commit.subject) <= 72
)
except ValueError:
return False
@classmethod
def parse(cls, message: str) -> CommitMessage:
"""Parse commit message into structured format."""
lines = message.split("\n")
if not lines:
raise ValueError("Empty commit message")
header = lines[0]
if ":" not in header:
raise ValueError("Invalid commit format")
type_scope, subject = header.split(":", 1)
type_scope = type_scope.strip()
type_str = type_scope
scope = None
if "(" in type_scope:
type_str, scope = (
type_scope.split("(", 1)[0],
type_scope.split("(", 1)[1].rstrip(")")
)
body = None
breaking_change = None
if len(lines) > 2:
body_lines = []
for line in lines[2:]:
if line.startswith("BREAKING CHANGE: "):
breaking_change = line[16:].strip()
else:
body_lines.append(line)
if body_lines:
body = "\n".join(body_lines).strip()
return CommitMessage(
type=type_str.strip(),
scope=scope.strip() if scope else None,
subject=subject.strip(),
body=body,
breaking_change=breaking_change
)
"""Code review checklist implementation."""
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
class ReviewStatus(str, Enum):
"""Review status enumeration."""
PENDING = "pending"
APPROVED = "approved"
CHANGES_REQUESTED = "changes_requested"
COMMENTED = "commented"
@dataclass
class ReviewItem:
"""Code review checklist item."""
category: str
description: str
required: bool = True
checked: bool = False
comments: Optional[str] = None
class CodeReviewChecklist:
"""Manage code review checklist."""
def __init__(self) -> None:
"""Initialize code review checklist."""
self.items: List[ReviewItem] = [
ReviewItem(
"functionality",
"Code functions as intended"
),
ReviewItem(
"tests",
"Adequate test coverage"
),
ReviewItem(
"types",
"Type hints are complete and accurate"
),
ReviewItem(
"docs",
"Documentation is clear and complete"
),
ReviewItem(
"style",
"Code follows style guidelines"
),
ReviewItem(
"security",
"No security vulnerabilities"
),
ReviewItem(
"performance",
"No obvious performance issues"
)
]
def validate(self) -> tuple[bool, List[str]]:
"""
Validate review checklist.
Returns:
Tuple of (is_valid, issues)
"""
issues = []
for item in self.items:
if item.required and not item.checked:
issues.append(
f"{item.category}: {item.description}"
)
return not bool(issues), issues
"""Version management utilities."""
from dataclasses import dataclass
import re
from typing import Optional
@dataclass
class Version:
"""Semantic version."""
major: int
minor: int
patch: int
pre_release: Optional[str] = None
build: Optional[str] = None
def __str__(self) -> str:
"""Format version string."""
version = f"{self.major}.{self.minor}.{self.patch}"
if self.pre_release:
version += f"-{self.pre_release}"
if self.build:
version += f"+{self.build}"
return version
@classmethod
def parse(cls, version_str: str) -> "Version":
"""Parse version string."""
pattern = (
r"^(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)"
r"(?:-(?P<pre_release>[0-9A-Za-z-]+))?"
r"(?:\+(?P<build>[0-9A-Za-z-]+))?$"
)
match = re.match(pattern, version_str)
if not match:
raise ValueError(f"Invalid version string: {version_str}")
return cls(
major=int(match.group("major")),
minor=int(match.group("minor")),
patch=int(match.group("patch")),
pre_release=match.group("pre_release"),
build=match.group("build")
)
def bump_major(self) -> "Version":
"""Bump major version."""
return Version(
major=self.major + 1,
minor=0,
patch=0
)
def bump_minor(self) -> "Version":
"""Bump minor version."""
return Version(
major=self.major,
minor=self.minor + 1,
patch=0
)
def bump_patch(self) -> "Version":
"""Bump patch version."""
return Version(
major=self.major,
minor=self.minor,
patch=self.patch + 1
)
"""Release management utilities."""
from datetime import datetime
from pathlib import Path
from typing import List, Optional
class ReleaseManager:
"""Manage software releases."""
def __init__(
self,
repo_path: Path,
version: Version
) -> None:
"""
Initialize release manager.
Args:
repo_path: Path to repository
version: Release version
"""
self.repo_path = repo_path
self.version = version
async def prepare_release(self) -> None:
"""Prepare new release."""
# Create release branch
branch_name = f"release/{self.version}"
subprocess.run(
["git", "checkout", "-b", branch_name, "develop"],
check=True
)
# Update version files
self._update_version_files()
# Generate changelog
await self.generate_changelog()
# Commit changes
subprocess.run(
[
"git", "commit", "-am",
f"chore(release): prepare {self.version}"
],
check=True
)
async def generate_changelog(self) -> None:
"""Generate changelog for release."""
changelog_path = self.repo_path / "CHANGELOG.md"
entries = await self._get_changelog_entries()
current_content = ""
if changelog_path.exists():
current_content = changelog_path.read_text()
new_content = self._format_changelog(entries)
changelog_path.write_text(
f"{new_content}\n\n{current_content}".strip()
)
async def _get_changelog_entries(self) -> List[CommitMessage]:
"""Get commits for changelog."""
result = subprocess.run(
["git", "log", "--pretty=format:%B%n<<END>>"],
capture_output=True,
text=True
)
commits = []
current_commit = []
for line in result.stdout.split("\n"):
if line == "<<END>>":
if current_commit:
message = "\n".join(current_commit)
try:
commits.append(
CommitValidator.parse(message)
)
except ValueError:
pass
current_commit = []
else:
current_commit.append(line)
return commits
"""Deprecation handling utilities."""
from __future__ import annotations
import warnings
from datetime import datetime
from functools import wraps
from typing import Any, Callable, Optional, Type, Union
from enum import Enum
class DeprecationType(str, Enum):
"""Types of deprecation."""
FUNCTION = "function"
CLASS = "class"
PARAMETER = "parameter"
MODULE = "module"
FEATURE = "feature"
class DeprecationWarning(Warning):
"""Custom deprecation warning."""
def __init__(
self,
message: str,
*,
since_version: str,
removal_version: Optional[str] = None,
alternative: Optional[str] = None
) -> None:
"""
Initialize deprecation warning.
Args:
message: Warning message
since_version: Version when deprecated
removal_version: Version when will be removed
alternative: Alternative to use
"""
self.since_version = since_version
self.removal_version = removal_version
self.alternative = alternative
super().__init__(self._format_message(message))
def _format_message(self, base_message: str) -> str:
"""Format complete warning message."""
message_parts = [
f"Deprecated: {base_message}",
f"(since version {self.since_version})"
]
if self.removal_version:
message_parts.append(
f"Will be removed in version {self.removal_version}"
)
if self.alternative:
message_parts.append(f"Use {self.alternative} instead")
return " ".join(message_parts)
class DeprecationManager:
"""Manage deprecation process."""
@staticmethod
def deprecate(
message: str,
*,
since_version: str,
removal_version: Optional[str] = None,
alternative: Optional[str] = None,
dep_type: DeprecationType = DeprecationType.FUNCTION
) -> Callable[..., Any]:
"""
Deprecation decorator.
Args:
message: Deprecation message
since_version: Version when deprecated
removal_version: Version when will be removed
alternative: Alternative to use
dep_type: Type of deprecation
Returns:
Decorator function
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
warnings.warn(
DeprecationWarning(
message,
since_version=since_version,
removal_version=removal_version,
alternative=alternative
),
stacklevel=2
)
return func(*args, **kwargs)
# Add deprecation metadata
wrapper._deprecation_info = {
"message": message,
"since_version": since_version,
"removal_version": removal_version,
"alternative": alternative,
"type": dep_type
}
return wrapper
return decorator
"""Migration utilities for project updates."""
from pathlib import Path
from typing import Any, Dict, List, Optional
import toml
class MigrationManager:
"""Manage project migrations."""
def __init__(
self,
project_root: Path,
from_version: str,
to_version: str
) -> None:
"""
Initialize migration manager.
Args:
project_root: Project root path
from_version: Starting version
to_version: Target version
"""
self.project_root = project_root
self.from_version = Version.parse(from_version)
self.to_version = Version.parse(to_version)
self.migrations_path = project_root / "migrations"
async def plan_migration(self) -> List[Dict[str, Any]]:
"""
Plan migration steps.
Returns:
List of migration steps
"""
migration_files = sorted(
self.migrations_path.glob("*.toml")
)
steps = []
for file in migration_files:
config = toml.load(file)
if self._is_applicable_migration(config):
steps.append(config)
return steps
async def execute_migration(
self,
dry_run: bool = False
) -> None:
"""
Execute migration steps.
Args:
dry_run: Whether to simulate migration
"""
steps = await self.plan_migration()
for step in steps:
if dry_run:
logger.info(
f"Would execute migration: {step['name']}"
)
continue
logger.info(f"Executing migration: {step['name']}")
try:
await self._execute_step(step)
await self._record_migration(step)
except Exception as e:
logger.error(
f"Migration failed: {step['name']}",
error=str(e)
)
raise
def _is_applicable_migration(
self,
config: Dict[str, Any]
) -> bool:
"""Check if migration is applicable."""
min_version = Version.parse(config["min_version"])
max_version = Version.parse(config["max_version"])
return (
self.from_version >= min_version
and self.to_version <= max_version
)
"""Documentation maintenance utilities."""
from pathlib import Path
from typing import Dict, List, Set
import re
class DocManager:
"""Manage project documentation."""
def __init__(self, docs_root: Path) -> None:
"""
Initialize documentation manager.
Args:
docs_root: Documentation root path
"""
self.docs_root = docs_root
self.api_docs_path = docs_root / "api"
self.guides_path = docs_root / "guides"
async def update_api_docs(self) -> None:
"""Update API documentation."""
modules = self._find_modules()
for module in modules:
await self._update_module_docs(module)
async def validate_docs(self) -> List[str]:
"""
Validate documentation completeness.
Returns:
List of validation issues
"""
issues = []
# Check for broken links
issues.extend(await self._check_links())
# Check for outdated versions
issues.extend(await self._check_versions())
# Check for code examples
issues.extend(await self._check_code_examples())
return issues
async def generate_changelog(self) -> None:
"""Generate changelog from git history."""
changelog_path = self.docs_root / "CHANGELOG.md"
entries = await self._get_changelog_entries()
content = "# Changelog\n\n"
for version, changes in entries.items():
content += f"## {version}\n\n"
for change in changes:
content += f"- {change}\n"
content += "\n"
changelog_path.write_text(content)
"""Backward compatibility utilities."""
from typing import Any, Callable, Dict, Optional, Type
import inspect
class CompatibilityLayer:
"""Manage backward compatibility."""
def __init__(self) -> None:
"""Initialize compatibility layer."""
self._compatibility_registry: Dict[
str,
Dict[str, Any]
] = {}
def register_compatibility(
self,
original_name: str,
new_name: str,
adapter: Optional[Callable[..., Any]] = None
) -> None:
"""
Register compatibility mapping.
Args:
original_name: Original name/path
new_name: New name/path
adapter: Optional adapter function
"""
self._compatibility_registry[original_name] = {
"new_name": new_name,
"adapter": adapter
}
def get_compatible_name(
self,
name: str
) -> str:
"""
Get current name for backward compatibility.
Args:
name: Original name
Returns:
Current compatible name
"""
if name in self._compatibility_registry:
return self._compatibility_registry[name]["new_name"]
return name
def adapt_call(
self,
original_name: str,
*args: Any,
**kwargs: Any
) -> Any:
"""
Adapt function call for compatibility.
Args:
original_name: Original function name
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Adapted function result
"""
if original_name not in self._compatibility_registry:
raise ValueError(f"No compatibility mapping for {original_name}")
mapping = self._compatibility_registry[original_name]
if mapping["adapter"]:
return mapping["adapter"](*args, **kwargs)
# Get new function
new_func = self._get_function(mapping["new_name"])
return new_func(*args, **kwargs)
"""Base service integration patterns."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, Optional, TypeVar
from pydantic import BaseModel
T = TypeVar('T', bound=BaseModel)
class ServiceConfig(BaseModel):
"""Base service configuration."""
base_url: str
timeout: int = 30
retry_attempts: int = 3
api_key: Optional[str] = None
class ServiceResponse(BaseModel):
"""Standard service response."""
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
meta: Optional[Dict[str, Any]] = None
class BaseService(ABC, Generic[T]):
"""Base service integration."""
def __init__(self, config: ServiceConfig) -> None:
"""
Initialize service.
Args:
config: Service configuration
"""
self.config = config
self._setup()
@abstractmethod
def _setup(self) -> None:
"""Setup service connection."""
pass
@abstractmethod
async def health_check(self) -> bool:
"""
Check service health.
Returns:
True if service is healthy
"""
pass
async def handle_error(
self,
error: Exception
) -> ServiceResponse:
"""
Handle service error.
Args:
error: Error to handle
Returns:
Error response
"""
return ServiceResponse(
success=False,
error=str(error)
)
"""Event handling system."""
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from pydantic import BaseModel, Field
class EventPriority(int, Enum):
"""Event priority levels."""
LOW = 0
NORMAL = 1
HIGH = 2
CRITICAL = 3
class Event(BaseModel):
"""Base event structure."""
event_type: str
payload: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.utcnow)
priority: EventPriority = EventPriority.NORMAL
source: Optional[str] = None
class EventBus:
"""Event management system."""
def __init__(self) -> None:
"""Initialize event bus."""
self._handlers: Dict[str, List[Callable]] = {}
self._middlewares: List[Callable] = []
def subscribe(
self,
event_type: str,
handler: Callable[[Event], Any]
) -> None:
"""
Subscribe to event type.
Args:
event_type: Type of event
handler: Event handler function
"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
def add_middleware(
self,
middleware: Callable[[Event], Event]
) -> None:
"""
Add event middleware.
Args:
middleware: Middleware function
"""
self._middlewares.append(middleware)
async def publish(self, event: Event) -> None:
"""
Publish event to subscribers.
Args:
event: Event to publish
"""
# Apply middlewares
for middleware in self._middlewares:
event = await middleware(event)
# Call handlers
if event.event_type in self._handlers:
for handler in self._handlers[event.event_type]:
try:
await handler(event)
except Exception as e:
logger.error(
f"Event handler failed: {e}",
event_type=event.event_type,
handler=handler.__name__
)
"""Message queue integration."""
from typing import Any, Callable, Optional
import json
import aio_pika
from pydantic import BaseModel
class QueueConfig(BaseModel):
"""Message queue configuration."""
url: str
exchange: str
queue: str
routing_key: str
prefetch_count: int = 10
class QueueManager:
"""Manage message queue interactions."""
def __init__(self, config: QueueConfig) -> None:
"""
Initialize queue manager.
Args:
config: Queue configuration
"""
self.config = config
self.connection: Optional[aio_pika.Connection] = None
self.channel: Optional[aio_pika.Channel] = None
self.exchange: Optional[aio_pika.Exchange] = None
async def connect(self) -> None:
"""Establish queue connection."""
self.connection = await aio_pika.connect_robust(
self.config.url
)
self.channel = await self.connection.channel()
await self.channel.set_qos(
prefetch_count=self.config.prefetch_count
)
self.exchange = await self.channel.declare_exchange(
self.config.exchange,
aio_pika.ExchangeType.TOPIC
)
async def publish(
self,
message: Dict[str, Any],
routing_key: Optional[str] = None
) -> None:
"""
Publish message to queue.
Args:
message: Message to publish
routing_key: Optional custom routing key
"""
if not self.exchange:
raise RuntimeError("Queue not connected")
await self.exchange.publish(
aio_pika.Message(
body=json.dumps(message).encode()
),
routing_key=routing_key or self.config.routing_key
)
async def consume(
self,
callback: Callable[[Dict[str, Any]], Any]
) -> None:
"""
Consume messages from queue.
Args:
callback: Message handler function
"""
if not self.channel:
raise RuntimeError("Queue not connected")
queue = await self.channel.declare_queue(
self.config.queue
)
async def process_message(
message: aio_pika.IncomingMessage
) -> None:
"""Process incoming message."""
async with message.process():
payload = json.loads(message.body.decode())
await callback(payload)
await queue.consume(process_message)
"""Webhook management system."""
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
from pydantic import BaseModel, Field
import hmac
import hashlib
class WebhookConfig(BaseModel):
"""Webhook configuration."""
endpoint: str
secret: str
events: List[str]
timeout: int = 30
max_retries: int = 3
class WebhookPayload(BaseModel):
"""Standard webhook payload."""
event: str
data: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.utcnow)
signature: Optional[str] = None
class WebhookManager:
"""Manage webhook operations."""
def __init__(self) -> None:
"""Initialize webhook manager."""
self._handlers: Dict[str, List[WebhookConfig]] = {}
self._validators: Dict[str, Callable[[Dict[str, Any]], bool]] = {}
def register_webhook(
self,
config: WebhookConfig,
events: Optional[List[str]] = None
) -> None:
"""
Register webhook configuration.
Args:
config: Webhook configuration
events: Optional event list override
"""
for event in events or config.events:
if event not in self._handlers:
self._handlers[event] = []
self._handlers[event].append(config)
def add_validator(
self,
event: str,
validator: Callable[[Dict[str, Any]], bool]
) -> None:
"""
Add payload validator.
Args:
event: Event type
validator: Validation function
"""
self._validators[event] = validator
async def trigger_webhook(
self,
event: str,
data: Dict[str, Any]
) -> None:
"""
Trigger webhook for event.
Args:
event: Event type
data: Event data
"""
if event not in self._handlers:
return
# Validate payload
if event in self._validators:
if not self._validators[event](data):
raise ValueError(f"Invalid payload for {event}")
payload = WebhookPayload(
event=event,
data=data
)
for config in self._handlers[event]:
try:
await self._send_webhook(config, payload)
except Exception as e:
logger.error(
f"Webhook delivery failed: {e}",
event=event,
endpoint=config.endpoint
)
"""Core CLI application structure."""
from __future__ import annotations
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
import typer
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.prompt import Confirm, Prompt
class CLIContext:
"""CLI application context."""
def __init__(self) -> None:
"""Initialize CLI context."""
self.console = Console()
self.verbose: bool = False
self.config_path: Optional[Path] = None
self.debug: bool = False
class BaseCLI:
"""Base CLI application."""
def __init__(self) -> None:
"""Initialize CLI application."""
self.app = typer.Typer(
help="CLI application description",
context_settings={"help_option_names": ["-h", "--help"]}
)
self.ctx = CLIContext()
self._register_commands()
def _register_commands(self) -> None:
"""Register CLI commands."""
# Core commands
self.app.command()(self.version)
self.app.command()(self.config)
# Command groups
self.app.add_typer(
self._get_user_commands(),
name="user",
help="User management commands"
)
def version(self) -> None:
"""Show version information."""
self.ctx.console.print(
f"[bold]Version:[/] {self._get_version()}"
)
def config(
self,
show: bool = typer.Option(
False,
"--show",
help="Show current configuration"
),
edit: bool = typer.Option(
False,
"--edit",
help="Edit configuration"
)
) -> None:
"""Manage configuration."""
if show:
self._show_config()
elif edit:
self._edit_config()
"""User interaction utilities."""
from typing import Any, Dict, List, Optional, Union
from rich.prompt import Confirm, Prompt
from rich.table import Table
class InteractionManager:
"""Manage user interactions."""
def __init__(self, console: Console) -> None:
"""
Initialize interaction manager.
Args:
console: Rich console instance
"""
self.console = console
async def prompt_input(
self,
message: str,
*,
default: Optional[str] = None,
password: bool = False,
validate: Optional[Callable[[str], bool]] = None
) -> str:
"""
Prompt user for input.
Args:
message: Prompt message
default: Default value
password: Whether input is password
validate: Optional validation function
Returns:
User input
"""
while True:
value = Prompt.ask(
message,
default=default,
password=password
)
if validate and not validate(value):
self.console.print(
"[red]Invalid input. Please try again.[/]"
)
continue
return value
def display_table(
self,
data: List[Dict[str, Any]],
columns: List[str],
title: Optional[str] = None
) -> None:
"""
Display data in table format.
Args:
data: List of data dictionaries
columns: Column names
title: Optional table title
"""
table = Table(title=title)
for column in columns:
table.add_column(column, style="cyan")
for row in data:
table.add_row(*[str(row.get(col, "")) for col in columns])
self.console.print(table)
def confirm_action(
self,
message: str,
default: bool = False
) -> bool:
"""
Confirm user action.
Args:
message: Confirmation message
default: Default response
Returns:
User confirmation
"""
return Confirm.ask(message, default=default)
"""Progress tracking utilities."""
from typing import Any, Iterator, Optional
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TimeRemainingColumn
)
class ProgressManager:
"""Manage progress indicators."""
def __init__(self, console: Console) -> None:
"""
Initialize progress manager.
Args:
console: Rich console instance
"""
self.console = console
@contextmanager
def progress_bar(
self,
total: int,
description: str = "Processing"
) -> Iterator[Progress]:
"""
Create progress bar context.
Args:
total: Total steps
description: Progress description
Yields:
Progress bar instance
"""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=self.console
) as progress:
yield progress.add_task(description, total=total)
@contextmanager
def spinner(
self,
message: str = "Working"
) -> Iterator[None]:
"""
Create spinner context.
Args:
message: Spinner message
Yields:
None
"""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=self.console,
transient=True
) as progress:
progress.add_task(message)
yield
"""CLI configuration management."""
from pathlib import Path
from typing import Any, Dict, Optional
import toml
from pydantic import BaseModel
class CLIConfig(BaseModel):
"""CLI configuration model."""
api_url: str
api_key: Optional[str] = None
default_format: str = "json"
color: bool = True
timeout: int = 30
class ConfigManager:
"""Manage CLI configuration."""
def __init__(
self,
config_path: Optional[Path] = None
) -> None:
"""
Initialize configuration manager.
Args:
config_path: Path to config file
"""
self.config_path = config_path or Path.home() / ".myapp" / "config.toml"
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self._load_config()
def _load_config(self) -> None:
"""Load configuration from file."""
if self.config_path.exists():
self.config = CLIConfig(
**toml.load(self.config_path)
)
else:
self.config = CLIConfig(api_url="http://localhost:8000")
def save_config(self) -> None:
"""Save configuration to file."""
self.config_path.write_text(
toml.dumps(self.config.model_dump())
)
def update_config(
self,
updates: Dict[str, Any]
) -> None:
"""
Update configuration values.
Args:
updates: Configuration updates
"""
current_config = self.config.model_dump()
current_config.update(updates)
self.config = CLIConfig(**current_config)
self.save_config()
"""CLI plugin system."""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Type
import importlib.util
import pkg_resources
class CLIPlugin(ABC):
"""Base CLI plugin."""
@abstractmethod
def get_commands(self) -> Dict[str, Callable[..., Any]]:
"""
Get plugin commands.
Returns:
Dictionary of command name to function
"""
pass
@abstractmethod
def initialize(self, cli: BaseCLI) -> None:
"""
Initialize plugin.
Args:
cli: CLI application instance
"""
pass
class PluginManager:
"""Manage CLI plugins."""
def __init__(self, plugin_dir: Optional[Path] = None) -> None:
"""
Initialize plugin manager.
Args:
plugin_dir: Optional plugin directory
"""
self.plugin_dir = plugin_dir or Path.home() / ".myapp" / "plugins"
self.plugin_dir.mkdir(parents=True, exist_ok=True)
self.plugins: Dict[str, CLIPlugin] = {}
def load_plugins(self) -> None:
"""Load available plugins."""
# Load from plugin directory
for plugin_path in self.plugin_dir.glob("*.py"):
self._load_plugin_from_file(plugin_path)
# Load from installed packages
for entry_point in pkg_resources.iter_entry_points("myapp.plugins"):
self._load_plugin_from_entry_point(entry_point)
def _load_plugin_from_file(self, path: Path) -> None:
"""Load plugin from file."""
spec = importlib.util.spec_from_file_location(
path.stem, path
)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
for item in dir(module):
obj = getattr(module, item)
if (
isinstance(obj, type)
and issubclass(obj, CLIPlugin)
and obj != CLIPlugin
):
plugin = obj()
self.plugins[path.stem] = plugin
def register_plugin(
self,
name: str,
plugin: CLIPlugin
) -> None:
"""
Register new plugin.
Args:
name: Plugin name
plugin: Plugin instance
"""
self.plugins[name] = plugin
"""Domain-Driven Design implementation patterns."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, TypeVar
from pydantic import BaseModel
T = TypeVar('T', bound=BaseModel)
class Entity(BaseModel):
"""Base entity with identity."""
id: str
def __eq__(self, other: object) -> bool:
"""Equal if IDs match."""
if not isinstance(other, Entity):
return False
return self.id == other.id
class ValueObject(BaseModel):
"""Immutable value object."""
class Config:
"""Pydantic configuration."""
frozen = True
class AggregateRoot(Entity):
"""Root entity of an aggregate."""
def __init__(self, **data: Any) -> None:
"""Initialize aggregate root."""
super().__init__(**data)
self._events: List[DomainEvent] = []
def add_event(self, event: DomainEvent) -> None:
"""Add domain event."""
self._events.append(event)
class Repository(ABC, Generic[T]):
"""Base repository interface."""
@abstractmethod
async def get_by_id(self, id: str) -> Optional[T]:
"""Get entity by ID."""
pass
@abstractmethod
async def save(self, entity: T) -> None:
"""Save entity."""
pass
class DomainService:
"""Domain logic that doesn't belong to entities."""
def __init__(self, repository: Repository[T]) -> None:
"""Initialize domain service."""
self.repository = repository
"""Clean Architecture implementation."""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Protocol
class UseCase(ABC):
"""Application use case interface."""
@abstractmethod
async def execute(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute use case."""
pass
class Repository(Protocol):
"""Data access interface."""
async def get(self, id: str) -> Any:
"""Get entity."""
...
async def save(self, entity: Any) -> None:
"""Save entity."""
...
class Presenter(Protocol):
"""Presentation interface."""
def present(self, response: Dict[str, Any]) -> Any:
"""Present response."""
...
class Controller:
"""Input boundary."""
def __init__(
self,
use_case: UseCase,
presenter: Presenter
) -> None:
"""Initialize controller."""
self.use_case = use_case
self.presenter = presenter
async def handle(
self,
request: Dict[str, Any]
) -> Any:
"""Handle input."""
response = await self.use_case.execute(request)
return self.presenter.present(response)
"""Microservices design patterns."""
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from pydantic import BaseModel
class ServiceDiscovery:
"""Service discovery implementation."""
def __init__(self, registry_url: str) -> None:
"""Initialize service discovery."""
self.registry_url = registry_url
self._services: Dict[str, List[str]] = {}
async def register(
self,
service_name: str,
instance_url: str
) -> None:
"""Register service instance."""
if service_name not in self._services:
self._services[service_name] = []
self._services[service_name].append(instance_url)
async def discover(self, service_name: str) -> List[str]:
"""Discover service instances."""
return self._services.get(service_name, [])
class CircuitBreakerConfig(BaseModel):
"""Circuit breaker configuration."""
failure_threshold: int
recovery_timeout: int
half_open_timeout: int
class CircuitBreaker:
"""Circuit breaker pattern."""
def __init__(self, config: CircuitBreakerConfig) -> None:
"""Initialize circuit breaker."""
self.config = config
self.failures = 0
self.state = "closed"
async def execute(
self,
func: Callable[..., Any]
) -> Any:
"""Execute with circuit breaker."""
if self.state == "open":
raise ServiceUnavailableError()
try:
result = await func()
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
class APIGateway:
"""API Gateway pattern."""
def __init__(
self,
discovery: ServiceDiscovery
) -> None:
"""Initialize API gateway."""
self.discovery = discovery
self.routes: Dict[str, str] = {}
async def register_route(
self,
path: str,
service: str
) -> None:
"""Register route."""
self.routes[path] = service
async def route_request(
self,
path: str,
request: Dict[str, Any]
) -> Any:
"""Route request to service."""
service = self.routes.get(path)
if not service:
raise RouteNotFoundError()
instances = await self.discovery.discover(service)
if not instances:
raise ServiceUnavailableError()
# Load balancing logic here
instance = random.choice(instances)
return await self._forward_request(instance, request)
"""CQRS (Command Query Responsibility Segregation) pattern."""
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, TypeVar
from pydantic import BaseModel
T = TypeVar('T', bound=BaseModel)
class Command(BaseModel):
"""Base command class."""
pass
class Query(BaseModel):
"""Base query class."""
pass
class CommandHandler(ABC, Generic[T]):
"""Base command handler."""
@abstractmethod
async def handle(self, command: T) -> None:
"""Handle command."""
pass
class QueryHandler(ABC, Generic[T]):
"""Base query handler."""
@abstractmethod
async def handle(self, query: T) -> Any:
"""Handle query."""
pass
class CommandBus:
"""Command bus implementation."""
def __init__(self) -> None:
"""Initialize command bus."""
self._handlers: Dict[
Type[Command],
CommandHandler
] = {}
def register(
self,
command_type: Type[Command],
handler: CommandHandler
) -> None:
"""Register command handler."""
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> None:
"""Dispatch command to handler."""
handler = self._handlers.get(type(command))
if not handler:
raise HandlerNotFoundError()
await handler.handle(command)
class QueryBus:
"""Query bus implementation."""
def __init__(self) -> None:
"""Initialize query bus."""
self._handlers: Dict[
Type[Query],
QueryHandler
] = {}
def register(
self,
query_type: Type[Query],
handler: QueryHandler
) -> None:
"""Register query handler."""
self._handlers[query_type] = handler
async def dispatch(self, query: Query) -> Any:
"""Dispatch query to handler."""
handler = self._handlers.get(type(query))
if not handler:
raise HandlerNotFoundError()
return await handler.handle(query)
// .vscode/settings.json
{
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.ruffEnabled": true,
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.testing.nosetestsEnabled": false,
"python.formatting.provider": "ruff",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"python.analysis.typeCheckingMode": "strict",
"[python]": {
"editor.rulers": [
88
],
"editor.tabSize": 4,
"editor.insertSpaces": true,
"editor.detectIndentation": false
}
}
// .vscode/launch.json
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
},
{
"name": "Python: Debug Tests",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/.venv/bin/pytest",
"args": [
"-v",
"--no-cov"
],
"console": "integratedTerminal",
"justMyCode": false
}
]
}
# .pre-commit-config.yaml
repos:
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.1.3
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.5.1
hooks:
- id: mypy
additional_dependencies: [types-all]
args: [--strict]
- repo: https://github.com/python-poetry/poetry
rev: 1.4.0
hooks:
- id: poetry-check
- id: poetry-lock
args: [--check]
- repo: local
hooks:
- id: pytest
name: pytest
entry: pytest
language: system
types: [python]
pass_filenames: false
"""cProfile integration utilities."""
from functools import wraps
import cProfile
import pstats
from typing import Any, Callable, Optional
from pathlib import Path
class Profiler:
"""Performance profiling utility."""
def __init__(
self,
profile_dir: Optional[Path] = None
) -> None:
"""
Initialize profiler.
Args:
profile_dir: Directory for profile outputs
"""
self.profile_dir = profile_dir or Path("profiles")
self.profile_dir.mkdir(exist_ok=True)
def profile(
self,
output_file: Optional[str] = None
) -> Callable[..., Any]:
"""
Profile decorator.
Args:
output_file: Optional output file name
Returns:
Decorated function
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
profile = cProfile.Profile()
try:
return profile.runcall(func, *args, **kwargs)
finally:
if output_file:
profile_path = self.profile_dir / output_file
profile.dump_stats(str(profile_path))
stats = pstats.Stats(profile)
stats.sort_stats('cumulative')
stats.print_stats()
return wrapper
return decorator
class ProfileAnalyzer:
"""Analyze profile results."""
def __init__(self, profile_path: Path) -> None:
"""
Initialize analyzer.
Args:
profile_path: Path to profile stats
"""
self.stats = pstats.Stats(str(profile_path))
def print_hotspots(
self,
limit: int = 10
) -> None:
"""
Print performance hotspots.
Args:
limit: Number of results to show
"""
self.stats.sort_stats('cumulative')
self.stats.print_stats(limit)
def print_callers(
self,
function_name: str
) -> None:
"""
Print callers of a function.
Args:
function_name: Function to analyze
"""
self.stats.print_callers(function_name)
"""Memory profiling utilities."""
from typing import Any, Callable, Dict, Optional
import tracemalloc
from dataclasses import dataclass
from datetime import datetime
@dataclass
class MemorySnapshot:
"""Memory snapshot data."""
timestamp: datetime
current: int
peak: int
traces: List[tracemalloc.Trace]
class MemoryProfiler:
"""Memory usage profiling utility."""
def __init__(self) -> None:
"""Initialize memory profiler."""
self.snapshots: List[MemorySnapshot] = []
def start(self) -> None:
"""Start memory tracking."""
tracemalloc.start()
def stop(self) -> None:
"""Stop memory tracking."""
tracemalloc.stop()
def take_snapshot(self) -> MemorySnapshot:
"""
Take memory snapshot.
Returns:
Memory snapshot data
"""
snapshot = tracemalloc.take_snapshot()
stats = snapshot.statistics('lineno')
current, peak = tracemalloc.get_traced_memory()
snapshot_data = MemorySnapshot(
timestamp=datetime.now(),
current=current,
peak=peak,
traces=stats[:10] # Top 10 memory users
)
self.snapshots.append(snapshot_data)
return snapshot_data
def compare_snapshots(
self,
start_index: int,
end_index: int
) -> None:
"""
Compare two snapshots.
Args:
start_index: First snapshot index
end_index: Second snapshot index
"""
if not (0 <= start_index < len(self.snapshots)):
raise ValueError("Invalid start index")
if not (0 <= end_index < len(self.snapshots)):
raise ValueError("Invalid end index")
start = self.snapshots[start_index]
end = self.snapshots[end_index]
print(f"Memory change: {end.current - start.current:,} bytes")
print(f"Peak change: {end.peak - start.peak:,} bytes")
# Compare top memory users
for trace in end.traces:
print(f"{trace.size_diff:,} bytes: {trace.traceback}")
class MemoryTracker:
"""Track memory usage of specific operations."""
def __init__(self) -> None:
"""Initialize memory tracker."""
self.profiler = MemoryProfiler()
def track(
self,
label: str
) -> Callable[..., Any]:
"""
Track memory usage decorator.
"""
Track memory usage decorator.
Args:
label: Operation label
Returns:
Decorated function
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
self.profiler.start()
try:
result = func(*args, **kwargs)
snapshot = self.profiler.take_snapshot()
print(f"\nMemory Usage ({label}):")
print(f"Current: {snapshot.current:,} bytes")
print(f"Peak: {snapshot.peak:,} bytes")
return result
finally:
self.profiler.stop()
return wrapper
return decorator
class MemoryMonitor:
"""Continuous memory monitoring."""
def __init__(
self,
threshold_mb: float = 100
) -> None:
"""
Initialize memory monitor.
Args:
threshold_mb: Memory threshold in MB
"""
self.threshold_mb = threshold_mb * 1024 * 1024 # Convert to bytes
self.profiler = MemoryProfiler()
async def monitor(
self,
interval: float = 1.0
) -> None:
"""
Monitor memory usage.
Args:
interval: Check interval in seconds
"""
self.profiler.start()
try:
while True:
snapshot = self.profiler.take_snapshot()
if snapshot.current > self.threshold_mb:
logger.warning(
"Memory threshold exceeded",
current=snapshot.current,
threshold=self.threshold_mb
)
await asyncio.sleep(interval)
finally:
self.profiler.stop()
"""Docker Swarm management utilities."""
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import subprocess
from pathlib import Path
class ServiceState(str, Enum):
"""Service states."""
RUNNING = "running"
STOPPED = "stopped"
FAILED = "failed"
UPDATING = "updating"
@dataclass
class ServiceConfig:
"""Service configuration."""
name: str
image: str
replicas: int = 1
env: Optional[Dict[str, str]] = None
ports: Optional[Dict[int, int]] = None
networks: Optional[List[str]] = None
volumes: Optional[Dict[str, str]] = None
constraints: Optional[List[str]] = None
class SwarmManager:
"""Manage Docker Swarm services."""
def __init__(self) -> None:
"""Initialize swarm manager."""
self._init_swarm()
def _init_swarm(self) -> None:
"""Initialize Docker Swarm."""
try:
subprocess.run(
["docker", "swarm", "init"],
check=True,
capture_output=True
)
except subprocess.CalledProcessError as e:
if b"already part of a swarm" not in e.stderr:
raise
def deploy_service(
self,
config: ServiceConfig
) -> None:
"""
Deploy service to swarm.
Args:
config: Service configuration
"""
command = [
"docker", "service", "create",
"--name", config.name,
"--replicas", str(config.replicas)
]
if config.env:
for key, value in config.env.items():
command.extend(["--env", f"{key}={value}"])
if config.ports:
for host_port, container_port in config.ports.items():
command.extend(
["--publish", f"{host_port}:{container_port}"]
)
if config.networks:
for network in config.networks:
command.extend(["--network", network])
if config.volumes:
for host_path, container_path in config.volumes.items():
command.extend(
["--mount", f"type=bind,src={host_path},dst={container_path}"]
)
if config.constraints:
for constraint in config.constraints:
command.extend(["--constraint", constraint])
command.append(config.image)
subprocess.run(command, check=True)
def scale_service(
self,
service_name: str,
replicas: int
) -> None:
"""
Scale service replicas.
Args:
service_name: Service to scale
replicas: Number of replicas
"""
subprocess.run(
[
"docker", "service", "scale",
f"{service_name}={replicas}"
],
check=True
)
def get_service_status(
self,
service_name: str
) -> ServiceState:
"""
Get service status.
Args:
service_name: Service to check
Returns:
Service state
"""
result = subprocess.run(
["docker", "service", "ps", service_name],
capture_output=True,
text=True
)
if "running" in result.stdout.lower():
return ServiceState.RUNNING
if "failed" in result.stdout.lower():
return ServiceState.FAILED
if "shutdown" in result.stdout.lower():
return ServiceState.STOPPED
return ServiceState.UPDATING
class SwarmNetwork:
"""Manage Docker Swarm networks."""
@staticmethod
def create_network(
name: str,
driver: str = "overlay",
attachable: bool = True
) -> None:
"""
Create overlay network.
Args:
name: Network name
driver: Network driver
attachable: Whether non-swarm services can attach
"""
command = ["docker", "network", "create"]
if driver:
command.extend(["--driver", driver])
if attachable:
command.append("--attachable")
command.append(name)
subprocess.run(command, check=True)
@staticmethod
def remove_network(name: str) -> None:
"""
Remove network.
Args:
name: Network name
"""
subprocess.run(
["docker", "network", "rm", name],
check=True
)
class SwarmSecret:
"""Manage Docker Swarm secrets."""
@staticmethod
def create_secret(
name: str,
data: str,
labels: Optional[Dict[str, str]] = None
) -> None:
"""
Create secret.
Args:
name: Secret name
data: Secret data
labels: Optional labels
"""
command = ["docker", "secret", "create"]
if labels:
for key, value in labels.items():
command.extend(["--label", f"{key}={value}"])
command.append(name)
command.append("-")
subprocess.run(
command,
input=data.encode(),
check=True
)
@staticmethod
def remove_secret(name: str) -> None:
"""
Remove secret.
Args:
name: Secret name
"""
subprocess.run(
["docker", "secret", "rm", name],
check=True
)
class SwarmStack:
"""Manage Docker Swarm stacks."""
def __init__(
self,
stack_dir: Path
) -> None:
"""
Initialize stack manager.
Args:
stack_dir: Stack configuration directory
"""
self.stack_dir = stack_dir
def deploy_stack(
self,
name: str,
compose_file: str = "docker-compose.yml"
) -> None:
"""
Deploy stack to swarm.
Args:
name: Stack name
compose_file: Compose file name
"""
compose_path = self.stack_dir / compose_file
subprocess.run(
[
"docker", "stack", "deploy",
"--compose-file", str(compose_path),
name
],
check=True
)
def remove_stack(self, name: str) -> None:
"""
Remove stack.
Args:
name: Stack name
"""
subprocess.run(
["docker", "stack", "rm", name],
check=True
)
def get_stack_services(
self,
name: str
) -> List[str]:
"""
Get stack services.
Args:
name: Stack name
Returns:
List of service names
"""
result = subprocess.run(
["docker", "stack", "services", name],
capture_output=True,
text=True
)
services = []
for line in result.stdout.splitlines()[1:]:
services.append(line.split()[1])
return services
"""
Track memory usage decorator.
Args:
label: Operation label
Returns:
Decorated function
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
self.profiler.start()
try:
result = func(*args, **kwargs)
snapshot = self.profiler.take_snapshot()
print(f"\nMemory Usage ({label}):")
print(f"Current: {snapshot.current:,} bytes")
print(f"Peak: {snapshot.peak:,} bytes")
return result
finally:
self.profiler.stop()
return wrapper
return decorator
class MemoryMonitor:
"""Continuous memory monitoring."""
def __init__(
self,
threshold_mb: float = 100
) -> None:
"""
Initialize memory monitor.
Args:
threshold_mb: Memory threshold in MB
"""
self.threshold_mb = threshold_mb * 1024 * 1024 # Convert to bytes
self.profiler = MemoryProfiler()
async def monitor(
self,
interval: float = 1.0
) -> None:
"""
Monitor memory usage.
Args:
interval: Check interval in seconds
"""
self.profiler.start()
try:
while True:
snapshot = self.profiler.take_snapshot()
if snapshot.current > self.threshold_mb:
logger.warning(
"Memory threshold exceeded",
current=snapshot.current,
threshold=self.threshold_mb
)
await asyncio.sleep(interval)
finally:
self.profiler.stop()
"""Docker Swarm management utilities."""
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import subprocess
from pathlib import Path
class ServiceState(str, Enum):
"""Service states."""
RUNNING = "running"
STOPPED = "stopped"
FAILED = "failed"
UPDATING = "updating"
@dataclass
class ServiceConfig:
"""Service configuration."""
name: str
image: str
replicas: int = 1
env: Optional[Dict[str, str]] = None
ports: Optional[Dict[int, int]] = None
networks: Optional[List[str]] = None
volumes: Optional[Dict[str, str]] = None
constraints: Optional[List[str]] = None
class SwarmManager:
"""Manage Docker Swarm services."""
def __init__(self) -> None:
"""Initialize swarm manager."""
self._init_swarm()
def _init_swarm(self) -> None:
"""Initialize Docker Swarm."""
try:
subprocess.run(
["docker", "swarm", "init"],
check=True,
capture_output=True
)
except subprocess.CalledProcessError as e:
if b"already part of a swarm" not in e.stderr:
raise
def deploy_service(
self,
config: ServiceConfig
) -> None:
"""
Deploy service to swarm.
Args:
config: Service configuration
"""
command = [
"docker", "service", "create",
"--name", config.name,
"--replicas", str(config.replicas)
]
if config.env:
for key, value in config.env.items():
command.extend(["--env", f"{key}={value}"])
if config.ports:
for host_port, container_port in config.ports.items():
command.extend(
["--publish", f"{host_port}:{container_port}"]
)
if config.networks:
for network in config.networks:
command.extend(["--network", network])
if config.volumes:
for host_path, container_path in config.volumes.items():
command.extend(
["--mount", f"type=bind,src={host_path},dst={container_path}"]
)
if config.constraints:
for constraint in config.constraints:
command.extend(["--constraint", constraint])
command.append(config.image)
subprocess.run(command, check=True)
def scale_service(
self,
service_name: str,
replicas: int
) -> None:
"""
Scale service replicas.
Args:
service_name: Service to scale
replicas: Number of replicas
"""
subprocess.run(
[
"docker", "service", "scale",
f"{service_name}={replicas}"
],
check=True
)
def get_service_status(
self,
service_name: str
) -> ServiceState:
"""
Get service status.
Args:
service_name: Service to check
Returns:
Service state
"""
result = subprocess.run(
["docker", "service", "ps", service_name],
capture_output=True,
text=True
)
if "running" in result.stdout.lower():
return ServiceState.RUNNING
if "failed" in result.stdout.lower():
return ServiceState.FAILED
if "shutdown" in result.stdout.lower():
return ServiceState.STOPPED
return ServiceState.UPDATING
class SwarmNetwork:
"""Manage Docker Swarm networks."""
@staticmethod
def create_network(
name: str,
driver: str = "overlay",
attachable: bool = True
) -> None:
"""
Create overlay network.
Args:
name: Network name
driver: Network driver
attachable: Whether non-swarm services can attach
"""
command = ["docker", "network", "create"]
if driver:
command.extend(["--driver", driver])
if attachable:
command.append("--attachable")
command.append(name)
subprocess.run(command, check=True)
@staticmethod
def remove_network(name: str) -> None:
"""
Remove network.
Args:
name: Network name
"""
subprocess.run(
["docker", "network", "rm", name],
check=True
)
class SwarmSecret:
"""Manage Docker Swarm secrets."""
@staticmethod
def create_secret(
name: str,
data: str,
labels: Optional[Dict[str, str]] = None
) -> None:
"""
Create secret.
Args:
name: Secret name
data: Secret data
labels: Optional labels
"""
command = ["docker", "secret", "create"]
if labels:
for key, value in labels.items():
command.extend(["--label", f"{key}={value}"])
command.append(name)
command.append("-")
subprocess.run(
command,
input=data.encode(),
check=True
)
@staticmethod
def remove_secret(name: str) -> None:
"""
Remove secret.
Args:
name: Secret name
"""
subprocess.run(
["docker", "secret", "rm", name],
check=True
)
class SwarmStack:
"""Manage Docker Swarm stacks."""
def __init__(
self,
stack_dir: Path
) -> None:
"""
Initialize stack manager.
Args:
stack_dir: Stack configuration directory
"""
self.stack_dir = stack_dir
def deploy_stack(
self,
name: str,
compose_file: str = "docker-compose.yml"
) -> None:
"""
Deploy stack to swarm.
Args:
name: Stack name
compose_file: Compose file name
"""
compose_path = self.stack_dir / compose_file
subprocess.run(
[
"docker", "stack", "deploy",
"--compose-file", str(compose_path),
name
],
check=True
)
def remove_stack(self, name: str) -> None:
"""
Remove stack.
Args:
name: Stack name
"""
subprocess.run(
["docker", "stack", "rm", name],
check=True
)
def get_stack_services(
self,
name: str
) -> List[str]:
"""
Get stack services.
Args:
name: Stack name
Returns:
List of service names
"""
result = subprocess.run(
["docker", "stack", "services", name],
capture_output=True,
text=True
)
services = []
for line in result.stdout.splitlines()[1:]:
services.append(line.split()[1])
return services