Last active
January 6, 2024 06:21
-
-
Save bajcmartinez/5062aa41ccbe2df1bbf4f1a9b95bd085 to your computer and use it in GitHub Desktop.
Build Secure APIs with Flask and Auth0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from six.moves.urllib.request import urlopen | |
from functools import wraps | |
from flask import Flask, request, jsonify, _request_ctx_stack | |
from flask_cors import cross_origin | |
from jose import jwt | |
AUTH0_DOMAIN = 'AUTH0-DOMAIN' | |
API_IDENTIFIER = 'API-IDENTIFIER' | |
ALGORITHMS = ["RS256"] | |
app = Flask(__name__) | |
# Error handler | |
class AuthError(Exception): | |
def __init__(self, error, status_code): | |
self.error = error | |
self.status_code = status_code | |
@app.errorhandler(AuthError) | |
def handle_auth_error(ex): | |
response = jsonify(ex.error) | |
response.status_code = ex.status_code | |
return response | |
# Format error response and append status code | |
def get_token_auth_header(): | |
""" | |
Obtains the Access Token from the Authorization Header | |
""" | |
auth = request.headers.get("Authorization", None) | |
if not auth: | |
raise AuthError({"code": "authorization_header_missing", | |
"description": | |
"Authorization header is expected"}, 401) | |
parts = auth.split() | |
if parts[0].lower() != "bearer": | |
raise AuthError({"code": "invalid_header", | |
"description": | |
"Authorization header must start with" | |
" Bearer"}, 401) | |
elif len(parts) == 1: | |
raise AuthError({"code": "invalid_header", | |
"description": "Token not found"}, 401) | |
elif len(parts) > 2: | |
raise AuthError({"code": "invalid_header", | |
"description": | |
"Authorization header must be" | |
" Bearer token"}, 401) | |
token = parts[1] | |
return token | |
def requires_auth(f): | |
""" | |
Determines if the Access Token is valid | |
""" | |
@wraps(f) | |
def decorated(*args, **kwargs): | |
token = get_token_auth_header() | |
print("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json") | |
jsonurl = urlopen("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json") | |
jwks = json.loads(jsonurl.read()) | |
unverified_header = jwt.get_unverified_header(token) | |
rsa_key = {} | |
for key in jwks["keys"]: | |
if key["kid"] == unverified_header["kid"]: | |
rsa_key = { | |
"kty": key["kty"], | |
"kid": key["kid"], | |
"use": key["use"], | |
"n": key["n"], | |
"e": key["e"] | |
} | |
if rsa_key: | |
try: | |
payload = jwt.decode( | |
token, | |
rsa_key, | |
algorithms=ALGORITHMS, | |
audience=API_IDENTIFIER, | |
issuer="https://"+AUTH0_DOMAIN+"/" | |
) | |
except jwt.ExpiredSignatureError: | |
raise AuthError({"code": "token_expired", | |
"description": "token is expired"}, 401) | |
except jwt.JWTClaimsError: | |
raise AuthError({"code": "invalid_claims", | |
"description": | |
"incorrect claims," | |
"please check the audience and issuer"}, 401) | |
except Exception: | |
raise AuthError({"code": "invalid_header", | |
"description": | |
"Unable to parse authentication" | |
" token."}, 401) | |
_request_ctx_stack.top.current_user = payload | |
return f(*args, **kwargs) | |
raise AuthError({"code": "invalid_header", | |
"description": "Unable to find appropriate key"}, 401) | |
return decorated | |
def requires_scope(required_scope): | |
""" | |
Determines if the required scope is present in the Access Token | |
Args: | |
required_scope (str): The scope required to access the resource | |
""" | |
token = get_token_auth_header() | |
unverified_claims = jwt.get_unverified_claims(token) | |
if unverified_claims.get("scope"): | |
token_scopes = unverified_claims["scope"].split() | |
for token_scope in token_scopes: | |
if token_scope == required_scope: | |
return True | |
return False | |
@app.route("/") | |
def index_view(): | |
""" | |
Default endpoint, it is public and can be accessed by anyone | |
""" | |
return jsonify(msg="Hello world!") | |
@app.route("/user") | |
@requires_auth | |
def user_view(): | |
""" | |
User endpoint, can only be accessed by an authorized user | |
""" | |
return jsonify(msg="Hello user!") | |
@app.route("/admin") | |
@requires_auth | |
def admin_view(): | |
""" | |
Admin endpoint, can only be accessed by an admin | |
""" | |
if requires_scope("read:admin"): | |
return jsonify(msg="Hello admin!") | |
raise AuthError({ | |
"code": "Unauthorized", | |
"description": "You don't have access to this resource" | |
}, 403) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment