Last active
October 13, 2024 17:41
-
-
Save JeanChristopheMorinPerso/7c729ac87ddc3b701b8c985a7f0c9146 to your computer and use it in GitHub Desktop.
FastAPI OAuth2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Taken from https://github.com/authlib/demo-oauth-client/blob/master/fastapi-google-login/app.py | |
# | |
# https://www.keycloak.org/getting-started/getting-started-docker | |
# https://www.keycloak.org/docs/25.0.2/securing_apps/#endpoints | |
# https://docs.getunleash.io/how-to/how-to-setup-sso-keycloak-group-sync | |
# https://developer.okta.com/docs/api/oauth2/ | |
# https://github.com/lukasthaler/fastapi-oauth-examples/blob/main/sessions/main.py | |
# https://developer.okta.com/blog/2020/12/17/build-and-secure-an-api-in-python-with-fastapi | |
# | |
# Another (unrelated) resource: https://developer.okta.com/blog/2022/07/14/add-auth-to-any-app-with-oauth2-proxy | |
# This explains how to use oauth2-proxy for both a web app and an API. | |
# | |
import json | |
import httpx | |
from fastapi import FastAPI, HTTPException, status, Depends | |
from starlette.config import Config | |
from starlette.requests import Request | |
from starlette.middleware.sessions import SessionMiddleware | |
from starlette.responses import HTMLResponse, RedirectResponse | |
from authlib.integrations.starlette_client import OAuth, OAuthError | |
from authlib.integrations.starlette_client.apps import StarletteOAuth2App | |
app = FastAPI() | |
app.add_middleware(SessionMiddleware, secret_key="!secret") | |
# config = Config('.env') | |
oauth = OAuth() | |
cache = {} | |
CONF_URL = 'http://localhost:8080/realms/test-realm/.well-known/openid-configuration' | |
oauth.register( | |
name='keycloak', | |
server_metadata_url=CONF_URL, | |
client_id='test-client', | |
client_kwargs={ | |
'scope': 'openid email profile groups', | |
'code_challenge_method': 'S256', | |
} | |
) | |
client: StarletteOAuth2App = oauth.create_client("keycloak") | |
client.load_server_metadata() | |
async def validate_remotely(token): | |
headers = { | |
'accept': 'application/json', | |
'cache-control': 'no-cache', | |
'content-type': 'application/x-www-form-urlencoded', | |
} | |
data = { | |
'client_id': client.client_id, | |
# 'client_secret': clientSecret, | |
'token': token, | |
} | |
url = client.server_metadata["introspection_endpoint"] | |
response = httpx.post(url, headers=headers, data=data) | |
return response.status_code == httpx.codes.OK and response.json()['active'] | |
def get_token(request: Request): | |
"""a dependency to extract the token from the request's session cookie | |
""" | |
session = request.session | |
if not session.get('user'): | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail='Not logged in' | |
) | |
if not cache.get(session['user']['email']): | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail='Not logged in' | |
) | |
return cache[session['user']['email']] | |
async def is_logged_in(request: Request): | |
"""a dependency to ensure a user is logged in via a session cookie | |
""" | |
session = request.session | |
if not session: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail='Not logged in' | |
) | |
return await validate_remotely(get_token(request)) | |
@app.get('/') | |
async def homepage(request: Request): | |
user = request.session.get('user') | |
if user: | |
data = json.dumps(user) | |
data2 = json.dumps(cache.get(user['email'], {})) | |
html = ( | |
f'<pre>{data}</pre>' | |
f'<pre>{data2}</pre>' | |
'<a href="/logout">logout</a>' | |
) | |
return HTMLResponse(html) | |
return HTMLResponse('<a href="/login">login</a>') | |
@app.get('/login') | |
async def login(request: Request): | |
redirect_uri = request.url_for('auth') | |
return await client.authorize_redirect(request, redirect_uri) | |
@app.get('/auth') | |
async def auth(request: Request): | |
# Exchange auth code for token | |
try: | |
token = await client.authorize_access_token(request) | |
except OAuthError as error: | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=error.error | |
) | |
# print(token) | |
# request.session['accesstoken'] = json.dumps(token)[:2000] | |
user = token.get('userinfo') | |
if user: | |
# Set the session cookie | |
request.session['user'] = dict(user) | |
cache[user['email']] = token | |
return RedirectResponse(url='/') | |
@app.get('/logout') | |
async def logout(request: Request): | |
request.session.pop('user', None) | |
return RedirectResponse(url='/') | |
# this endpoint is explicitly login-protected by requiring the token from the session cookie | |
@app.get('/users/me') | |
async def get_current_user(token=Depends(get_token)): | |
"""get the currently logged-in user based on their session cookie | |
""" | |
# use the access token to fetch the user | |
headers = {'Authorization': f'Bearer {token.get("access_token")}'} | |
async with aiohttp.ClientSession() as sess: | |
async with sess.get(DISCORD_API_PATH + '/users/@me', headers=headers) as response: | |
# catch any http errors | |
if response.status != status.HTTP_200_OK: | |
response.raise_for_status() | |
payload = await response.json() | |
return payload | |
# this endpoint is implicitly login-protected via a dependency checking for a session cookie | |
@app.get('/privileged', dependencies=[Depends(is_logged_in)]) | |
async def only_for_logged_in_users(): | |
return 'Congratulations, you are logged in using Discord!' | |
if __name__ == '__main__': | |
import uvicorn | |
uvicorn.run(app, host='127.0.0.1', port=8000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment