Created
July 23, 2020 13:37
-
-
Save cmabastar/8d51504fdff31396b1a7cff5f4a0ac75 to your computer and use it in GitHub Desktop.
CloudFlare access decorator for flask auth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@bp.route("/", methods=["GET", "POST"]) | |
@cf_verify_token | |
def admin(): | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import abort, request, current_app, g | |
from functools import wraps | |
import jwt | |
import json | |
import requests | |
def _get_public_keys(): | |
""" | |
Returns: | |
List of RSA public keys usable by PyJWT. | |
""" | |
resp = requests.get(current_app.config['AUTH_DOMAIN']) | |
public_keys = [] | |
jwk_set = resp.json() | |
for key_dict in jwk_set["keys"]: | |
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key_dict)) | |
public_keys.append(public_key) | |
return public_keys | |
def cf_verify_token(f): | |
""" | |
Decorator that wraps a Flask API call to verify the CF Access JWT | |
""" | |
@wraps(f) | |
def wrapper(*args, **kwargs): | |
if not current_app.config['ADMIN_CF_AUTHORIZE']: | |
g.admin_user = 'localtest' | |
return f(*args, **kwargs) | |
token = "" | |
if "CF_Authorization" in request.cookies: | |
token = request.cookies["CF_Authorization"] | |
else: | |
abort(401) | |
keys = _get_public_keys() | |
# Loop through the keys since we can't pass the key set to the decoder | |
valid_token = False | |
data = None | |
for key in keys: | |
try: | |
# decode returns the claims that has the email when needed | |
data = jwt.decode(token, key=key, audience=current_app.config['POLICY_AUD']) | |
valid_token = True | |
break | |
except Exception as ex: | |
pass | |
if not valid_token: | |
abort(401) | |
g.admin_user = data['email'] | |
return f(*args, **kwargs) | |
return wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment