Created
August 31, 2023 00:26
-
-
Save fearnworks/194ca98de52df355033a1512e3ae253d to your computer and use it in GitHub Desktop.
Generated by code llama
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from passlib.context import CryptContext | |
from jose import jwt | |
import datetime | |
SECRET_KEY = "YOUR-SECRET-KEY" # Replace this with your secret key | |
ALGORITHM = "HS256" | |
ACCESS_TOKEN_EXPIRE_MINUTES = 30 | |
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
class User: | |
def __init__(self, username: str, password: str): | |
self.username = username | |
self.password = pwd_context.hash(password) # Hash the password using bcrypt | |
def verify_password(self, password: str): | |
return pwd_context.verify(password, self.password) | |
from fastapi import FastAPI, HTTPException, Depends, status | |
from fastapi.security import OAuth2PasswordBearer | |
from pydantic import BaseModel | |
app = FastAPI() | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
# Mock user database (in practice this would be a real database) | |
user_db = { | |
"user1": User("user1", "password1"), | |
"user2": User("user2", "password2"), | |
} | |
class TokenData(BaseModel): | |
username: str | |
def authenticate_user(username: str, password: str): | |
user = user_db.get(username) | |
if not user or not user.verify_password(password): | |
return False | |
return user | |
async def get_current_user(token: str = Depends(oauth2_scheme)): | |
credentials_exception = HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid authentication credentials", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
try: | |
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
username: str = payload.get("sub") | |
if username is None or (user := user_db.get(username)) is None: | |
raise credentials_exception | |
except jwt.JWTError: | |
raise credentials_exception | |
return user | |
@app.post("/token", response_model=TokenData) | |
async def login_for_access_token(form_data: dict): | |
username = form_data.get("username") | |
password = form_data.get("password") | |
user = authenticate_user(username, password) | |
if not user: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Incorrect username or password", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
access_token_expires = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
access_token = create_access_token( | |
data={"sub": user.username}, expires_delta=access_token_expires | |
) | |
return {"access_token": access_token, "token_type": "bearer"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment