Last active
December 4, 2024 19:45
-
-
Save AliYmn/2a27a4165525cad128e6f4d94dbb4932 to your computer and use it in GitHub Desktop.
High-Performance API Signature Validation: A FastAPI Journey 🚀
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
from functools import lru_cache | |
from json.decoder import JSONDecodeError | |
from typing import Any, Dict, Optional, Type | |
from fastapi import Request | |
from sqlalchemy import select | |
from libs.db import get_db_session | |
from libs.exceptions import ErrorCode | |
from libs.models import Operator | |
class SignatureService: | |
 __slots__ = ("request", "exception_class") | |
 _secret_key_cache = {} # Class-level cache | |
def __init__(self, request: Request, exception_class: Type[Exception]) -> None: | |
 self.request = request | |
 self.exception_class = exception_class | |
async def _get_request_data(self) -> Optional[Dict[str, Any]]: | |
 if self.request.method == "GET": | |
 return dict(self.request.query_params) | |
 elif self.request.method == "POST": | |
 try: | |
 return await self.request.json() | |
 except JSONDecodeError: | |
 return None | |
 return None | |
@classmethod | |
 async def _get_operator_secret_key(cls, operator_id: str) -> Optional[str]: | |
 # Check cache first | |
 if operator_id in cls._secret_key_cache: | |
 return cls._secret_key_cache[operator_id] | |
# Fetch from DB if not in cache | |
 async with get_db_session() as db: | |
 credentials = await db.scalar( | |
 select(Operator.credentials).filter( | |
 Operator.operator_id == operator_id, Operator.is_active == True | |
 ) | |
 ) | |
 secret_key = credentials.get("secret_key") if credentials else None | |
 | |
 if secret_key: | |
 cls._secret_key_cache[operator_id] = secret_key | |
 | |
 return secret_key | |
def _calculate_signature(self, request_uuid: str, operator_id: str, secret_key: str) -> str: | |
 return hashlib.sha256( | |
 b"|".join([ | |
 str(request_uuid).strip().encode(), | |
 str(operator_id).strip().encode(), | |
 str(secret_key).strip().encode(), | |
 ]) | |
 ).hexdigest() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment