Created
May 30, 2024 02:26
-
-
Save johnowhitaker/b936016d29138df2becbd1aed9f4defa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests, json, uuid | |
from starlette.responses import FileResponse, RedirectResponse | |
from fastcore.utils import * | |
from fastcore.xml import * | |
from fasthtml import * | |
from oauthlib.oauth2 import WebApplicationClient | |
# Secrets + URLs | |
auth_client_id = os.environ['AUTH_CLIENT_ID'] | |
auth_client_secret = os.environ['AUTH_CLIENT_SECRET'] | |
DOMAIN = "http://localhost:8000" | |
auth_redirect_uri = DOMAIN + '/redirect' | |
auth_client = WebApplicationClient(auth_client_id) | |
authorization_base_url = "https://accounts.google.com/o/oauth2/v2/auth" | |
authorization_token_url = "https://www.googleapis.com/oauth2/v4/token" | |
authorization_user_info_url = "https://www.googleapis.com/oauth2/v3/userinfo" | |
authoization_revoke_url = "https://accounts.google.com/o/oauth2/revoke" | |
authorization_scope = [ | |
"openid", "https://www.googleapis.com/auth/userinfo.email", | |
"https://www.googleapis.com/auth/userinfo.profile" | |
] | |
users = {} | |
app = FastHTML(hdrs=(picolink,)) | |
# To fully sign out, we can revoke the access token | |
def revoke_token(token): | |
response = requests.post(authoization_revoke_url, params={"token": token}) | |
return response.status_code == 200 | |
@app.get("/logout") | |
async def logout(token:str): | |
if token is not None: | |
user = users.get(token) | |
if user is not None: | |
revoke_token(user['google_auth_token']) | |
del users[token] | |
return RedirectResponse(url=DOMAIN) | |
@app.get("/") | |
async def home(token:str): | |
if token is not None: | |
print("token", token) | |
print("users", users) | |
user = users.get(token) | |
if user is not None: | |
return Div( | |
B(f"Hello, {user['email']}!"), | |
A('logout', href="/logout") | |
) | |
login_link = auth_client.prepare_request_uri( | |
authorization_base_url, | |
redirect_uri=auth_redirect_uri, | |
scope=authorization_scope, | |
state='hello' # Use in app if needed | |
) | |
print("Log in link:", login_link) | |
return Div( | |
B("Hello, World!"), | |
A('login', href=login_link) | |
) | |
@app.get("/redirect") | |
async def redirect(code:str): | |
# Check that we got a code | |
if code is None: | |
return "No code received" | |
# Prepare the data as JSON | |
payload = { | |
'code': code, | |
'redirect_uri': auth_redirect_uri, | |
'client_id': auth_client_id, | |
'client_secret': auth_client_secret, | |
'grant_type': 'authorization_code', | |
} | |
# Exchange code for tokens | |
headers = {'Content-Type': 'application/json'} | |
response = requests.post(authorization_token_url, | |
data=json.dumps(payload), | |
headers=headers) | |
auth_client.parse_request_body_response(response.text) | |
print("Token", auth_client.token) # Includes expires_in and other fields | |
# Get user info using that token | |
headers = { | |
'Authorization': 'Bearer {}'.format(auth_client.token['access_token']) | |
} | |
response = requests.get(authorization_user_info_url, headers=headers) | |
user_info = response.json() | |
print("User info", user_info) | |
my_special_token = str(uuid.uuid4()) | |
users[my_special_token ] = user_info | |
users[my_special_token]['google_auth_token'] = auth_client.token['access_token'] | |
response = RedirectResponse(url=DOMAIN) | |
response.set_cookie('token', my_special_token) | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment