Created
December 17, 2020 08:28
-
-
Save vincentsarago/ec4e362ef9cca39bc0ee79013b18efbe to your computer and use it in GitHub Desktop.
security example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""app dependencies.""" | |
import re | |
from dataclasses import dataclass | |
from jose import JWTError | |
from fastapi import HTTPException, Query, Security | |
from fastapi.security.api_key import APIKeyQuery | |
api_key_query = APIKeyQuery(name="access_token", auto_error=False) | |
from .models import AccessToken | |
@dataclass | |
class PathParams: | |
"""define URL and make sure token is OK""" | |
url: str = Query(..., description="Dataset URL") | |
api_key_query: str = Security(api_key_query) | |
def __post_init__(self,): | |
"""DO TOKEN VALIDATION HERE.""" | |
if not self.api_key_query: | |
raise HTTPException(status_code=403, detail="Missing `access_token`") | |
try: | |
self.token = AccessToken.from_string(self.api_key_query) | |
except JWTError: | |
raise HTTPException(status_code=403, detail="Invalid `access_token`") | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Ressource models.""" | |
from datetime import datetime, timedelta | |
from typing import List, Optional | |
from jose import jwt | |
from pydantic import BaseModel, BaseSettings, Field, validator | |
availables_scopes = ["tiles:read"] | |
# add user/password login to be able to create a Token | |
# this will need to be pluged to a DB | |
class AuthSettings(BaseSettings): | |
"""Application settings""" | |
secret: str #openssl rand -hex 32: "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" | |
expires: int = 3600 | |
algorithm: str = "HS256" | |
class Config: | |
"""model config""" | |
env_prefix = "SECURITY_" | |
class AccessToken(BaseModel): | |
"""API Token info.""" | |
sub: str = Field(..., alias="username", regex="^[a-zA-Z0-9-_]{1,32}$") | |
scope: List = ["tiles:read"] | |
iat: Optional[datetime] = None | |
exp: Optional[datetime] = None | |
groups: Optional[List[str]] | |
@validator("iat", pre=True, always=True) | |
def set_creation_time(cls, v) -> datetime: | |
"""Set token creation time (iat).""" | |
return datetime.utcnow() | |
@validator("exp", always=True) | |
def set_expiration_time(cls, v, values) -> datetime: | |
"""Set token expiration time (iat).""" | |
return values["iat"] + timedelta(seconds=auth_config.expires) | |
@validator("scope", each_item=True) | |
def valid_scopes(cls, v, values): | |
"""Validate Scopes.""" | |
v = v.lower() | |
if v not in availables_scopes: | |
raise ValueError(f"Invalid scope: {v}") | |
return v.lower() | |
class Config: | |
"""Access Token Model config.""" | |
extra = "forbid" | |
@property | |
def username(self) -> str: | |
"""Return Username.""" | |
return self.sub | |
def __str__(self): | |
"""Create jwt token string.""" | |
return jwt.encode( | |
self.dict(exclude_none=True), | |
auth_config.secret, | |
algorithm=auth_config.algorithm, | |
) | |
@classmethod | |
def from_string(cls, token: str): | |
"""Parse jwt token string.""" | |
res = jwt.decode(token, auth_config.secret, algorithms=[auth_config.algorithm]) | |
user = res.pop("sub") | |
res["username"] = user | |
return cls(**res) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""api.cogeo.xyz Tokens.""" | |
from typing import Any, Dict | |
from ..ressources.models import AccessToken | |
from fastapi import APIRouter, Query | |
router = APIRouter() | |
@router.post(r"/create", responses={200: {"description": "Create a token"}}) | |
def create_token(body: AccessToken): | |
"""create token.""" | |
return {"token": str(body)} | |
@router.get(r"/create", responses={200: {"description": "Create a token"}}) | |
def get_token( | |
username: str = Query(..., description="Username"), | |
scope: str = Query(None, description="Coma (,) delimited token scopes"), | |
): | |
"""create token.""" | |
params: Dict[str, Any] = {"username": username} | |
if scope: | |
params["scope"] = scope.split(",") | |
token = AccessToken(**params) | |
return {"token": str(token)} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment