Skip to content

Instantly share code, notes, and snippets.

@JeanChristopheMorinPerso
Last active October 13, 2024 17:41
Show Gist options
  • Save JeanChristopheMorinPerso/7c729ac87ddc3b701b8c985a7f0c9146 to your computer and use it in GitHub Desktop.
Save JeanChristopheMorinPerso/7c729ac87ddc3b701b8c985a7f0c9146 to your computer and use it in GitHub Desktop.
FastAPI OAuth2
# Taken from https://github.com/authlib/demo-oauth-client/blob/master/fastapi-google-login/app.py
#
# https://www.keycloak.org/getting-started/getting-started-docker
# https://www.keycloak.org/docs/25.0.2/securing_apps/#endpoints
# https://docs.getunleash.io/how-to/how-to-setup-sso-keycloak-group-sync
# https://developer.okta.com/docs/api/oauth2/
# https://github.com/lukasthaler/fastapi-oauth-examples/blob/main/sessions/main.py
# https://developer.okta.com/blog/2020/12/17/build-and-secure-an-api-in-python-with-fastapi
#
# Another (unrelated) resource: https://developer.okta.com/blog/2022/07/14/add-auth-to-any-app-with-oauth2-proxy
# This explains how to use oauth2-proxy for both a web app and an API.
#
import json
import httpx
from fastapi import FastAPI, HTTPException, status, Depends
from starlette.config import Config
from starlette.requests import Request
from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import HTMLResponse, RedirectResponse
from authlib.integrations.starlette_client import OAuth, OAuthError
from authlib.integrations.starlette_client.apps import StarletteOAuth2App
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="!secret")
# config = Config('.env')
oauth = OAuth()
cache = {}
CONF_URL = 'http://localhost:8080/realms/test-realm/.well-known/openid-configuration'
oauth.register(
name='keycloak',
server_metadata_url=CONF_URL,
client_id='test-client',
client_kwargs={
'scope': 'openid email profile groups',
'code_challenge_method': 'S256',
}
)
client: StarletteOAuth2App = oauth.create_client("keycloak")
client.load_server_metadata()
async def validate_remotely(token):
headers = {
'accept': 'application/json',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded',
}
data = {
'client_id': client.client_id,
# 'client_secret': clientSecret,
'token': token,
}
url = client.server_metadata["introspection_endpoint"]
response = httpx.post(url, headers=headers, data=data)
return response.status_code == httpx.codes.OK and response.json()['active']
def get_token(request: Request):
"""a dependency to extract the token from the request's session cookie
"""
session = request.session
if not session.get('user'):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Not logged in'
)
if not cache.get(session['user']['email']):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Not logged in'
)
return cache[session['user']['email']]
async def is_logged_in(request: Request):
"""a dependency to ensure a user is logged in via a session cookie
"""
session = request.session
if not session:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Not logged in'
)
return await validate_remotely(get_token(request))
@app.get('/')
async def homepage(request: Request):
user = request.session.get('user')
if user:
data = json.dumps(user)
data2 = json.dumps(cache.get(user['email'], {}))
html = (
f'<pre>{data}</pre>'
f'<pre>{data2}</pre>'
'<a href="/logout">logout</a>'
)
return HTMLResponse(html)
return HTMLResponse('<a href="/login">login</a>')
@app.get('/login')
async def login(request: Request):
redirect_uri = request.url_for('auth')
return await client.authorize_redirect(request, redirect_uri)
@app.get('/auth')
async def auth(request: Request):
# Exchange auth code for token
try:
token = await client.authorize_access_token(request)
except OAuthError as error:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=error.error
)
# print(token)
# request.session['accesstoken'] = json.dumps(token)[:2000]
user = token.get('userinfo')
if user:
# Set the session cookie
request.session['user'] = dict(user)
cache[user['email']] = token
return RedirectResponse(url='/')
@app.get('/logout')
async def logout(request: Request):
request.session.pop('user', None)
return RedirectResponse(url='/')
# this endpoint is explicitly login-protected by requiring the token from the session cookie
@app.get('/users/me')
async def get_current_user(token=Depends(get_token)):
"""get the currently logged-in user based on their session cookie
"""
# use the access token to fetch the user
headers = {'Authorization': f'Bearer {token.get("access_token")}'}
async with aiohttp.ClientSession() as sess:
async with sess.get(DISCORD_API_PATH + '/users/@me', headers=headers) as response:
# catch any http errors
if response.status != status.HTTP_200_OK:
response.raise_for_status()
payload = await response.json()
return payload
# this endpoint is implicitly login-protected via a dependency checking for a session cookie
@app.get('/privileged', dependencies=[Depends(is_logged_in)])
async def only_for_logged_in_users():
return 'Congratulations, you are logged in using Discord!'
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='127.0.0.1', port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment