Created
May 10, 2019 21:26
-
-
Save nilsdebruin/a5771eca2a44478a909bf94052b92d0a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@app.get("/google_login_client", tags=["security"]) | |
def google_login_client(): | |
return HTMLResponse(google_login_javascript_client) | |
@app.get("/google_login_server", tags=["security"]) | |
def google_login_server(): | |
return HTMLResponse(google_login_javascript_server) | |
@app.post(f"{SWAP_TOKEN_ENDPOINT}", response_model=Token, tags=["security"]) | |
async def swap_token(request: Request = None): | |
if not request.headers.get("X-Requested-With"): | |
raise HTTPException(status_code=400, detail="Incorrect headers") | |
google_client_type = request.headers.get("X-Google-OAuth2-Type") | |
if google_client_type == 'server': | |
try: | |
body_bytes = await request.body() | |
auth_code = jsonable_encoder(body_bytes) | |
credentials = client.credentials_from_clientsecrets_and_code( | |
CLIENT_SECRETS_JSON, ["profile", "email"], auth_code | |
) | |
http_auth = credentials.authorize(httplib2.Http()) | |
email = credentials.id_token["email"] | |
except: | |
raise HTTPException(status_code=400, detail="Unable to validate social login") | |
if google_client_type == 'client': | |
body_bytes = await request.body() | |
auth_code = jsonable_encoder(body_bytes) | |
try: | |
idinfo = id_token.verify_oauth2_token(auth_code, requests.Request(), CLIENT_ID) | |
# Or, if multiple clients access the backend server: | |
# idinfo = id_token.verify_oauth2_token(token, requests.Request()) | |
# if idinfo['aud'] not in [CLIENT_ID_1, CLIENT_ID_2, CLIENT_ID_3]: | |
# raise ValueError('Could not verify audience.') | |
if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']: | |
raise ValueError('Wrong issuer.') | |
# If auth request is from a G Suite domain: | |
# if idinfo['hd'] != GSUITE_DOMAIN_NAME: | |
# raise ValueError('Wrong hosted domain.') | |
if idinfo['email'] and idinfo['email_verified']: | |
email = idinfo.get('email') | |
else: | |
raise HTTPException(status_code=400, detail="Unable to validate social login") | |
except: | |
raise HTTPException(status_code=400, detail="Unable to validate social login") | |
authenticated_user = authenticate_user_email(fake_users_db, email) | |
if not authenticated_user: | |
raise HTTPException(status_code=400, detail="Incorrect email address") | |
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
access_token = create_access_token( | |
data={"sub": authenticated_user.email}, expires_delta=access_token_expires | |
) | |
token = jsonable_encoder(access_token) | |
response = JSONResponse({"access_token": token, "token_type": "bearer"}) | |
response.set_cookie( | |
COOKIE_AUTHORIZATION_NAME, | |
value=f"Bearer {token}", | |
domain=COOKIE_DOMAIN, | |
httponly=True, | |
max_age=1800, | |
expires=1800, | |
) | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment