Skip to content

Instantly share code, notes, and snippets.

@vincentsarago
Created December 17, 2020 08:28
Show Gist options
  • Save vincentsarago/ec4e362ef9cca39bc0ee79013b18efbe to your computer and use it in GitHub Desktop.
Save vincentsarago/ec4e362ef9cca39bc0ee79013b18efbe to your computer and use it in GitHub Desktop.
security example
"""app dependencies."""
import re
from dataclasses import dataclass
from jose import JWTError
from fastapi import HTTPException, Query, Security
from fastapi.security.api_key import APIKeyQuery
api_key_query = APIKeyQuery(name="access_token", auto_error=False)
from .models import AccessToken
@dataclass
class PathParams:
"""define URL and make sure token is OK"""
url: str = Query(..., description="Dataset URL")
api_key_query: str = Security(api_key_query)
def __post_init__(self,):
"""DO TOKEN VALIDATION HERE."""
if not self.api_key_query:
raise HTTPException(status_code=403, detail="Missing `access_token`")
try:
self.token = AccessToken.from_string(self.api_key_query)
except JWTError:
raise HTTPException(status_code=403, detail="Invalid `access_token`")
"""Ressource models."""
from datetime import datetime, timedelta
from typing import List, Optional
from jose import jwt
from pydantic import BaseModel, BaseSettings, Field, validator
availables_scopes = ["tiles:read"]
# add user/password login to be able to create a Token
# this will need to be pluged to a DB
class AuthSettings(BaseSettings):
"""Application settings"""
secret: str #openssl rand -hex 32: "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
expires: int = 3600
algorithm: str = "HS256"
class Config:
"""model config"""
env_prefix = "SECURITY_"
class AccessToken(BaseModel):
"""API Token info."""
sub: str = Field(..., alias="username", regex="^[a-zA-Z0-9-_]{1,32}$")
scope: List = ["tiles:read"]
iat: Optional[datetime] = None
exp: Optional[datetime] = None
groups: Optional[List[str]]
@validator("iat", pre=True, always=True)
def set_creation_time(cls, v) -> datetime:
"""Set token creation time (iat)."""
return datetime.utcnow()
@validator("exp", always=True)
def set_expiration_time(cls, v, values) -> datetime:
"""Set token expiration time (iat)."""
return values["iat"] + timedelta(seconds=auth_config.expires)
@validator("scope", each_item=True)
def valid_scopes(cls, v, values):
"""Validate Scopes."""
v = v.lower()
if v not in availables_scopes:
raise ValueError(f"Invalid scope: {v}")
return v.lower()
class Config:
"""Access Token Model config."""
extra = "forbid"
@property
def username(self) -> str:
"""Return Username."""
return self.sub
def __str__(self):
"""Create jwt token string."""
return jwt.encode(
self.dict(exclude_none=True),
auth_config.secret,
algorithm=auth_config.algorithm,
)
@classmethod
def from_string(cls, token: str):
"""Parse jwt token string."""
res = jwt.decode(token, auth_config.secret, algorithms=[auth_config.algorithm])
user = res.pop("sub")
res["username"] = user
return cls(**res)
"""api.cogeo.xyz Tokens."""
from typing import Any, Dict
from ..ressources.models import AccessToken
from fastapi import APIRouter, Query
router = APIRouter()
@router.post(r"/create", responses={200: {"description": "Create a token"}})
def create_token(body: AccessToken):
"""create token."""
return {"token": str(body)}
@router.get(r"/create", responses={200: {"description": "Create a token"}})
def get_token(
username: str = Query(..., description="Username"),
scope: str = Query(None, description="Coma (,) delimited token scopes"),
):
"""create token."""
params: Dict[str, Any] = {"username": username}
if scope:
params["scope"] = scope.split(",")
token = AccessToken(**params)
return {"token": str(token)}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment