Instantly share code, notes, and snippets.
Created
December 12, 2018 23:39
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save brianv0/c6a5a75d3fc5c92ad9142e6bc33a134a to your computer and use it in GitHub Desktop.
Authorizer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import base64 | |
import configparser | |
import errno | |
import json | |
import logging | |
import os | |
import re | |
import struct | |
import subprocess | |
from typing import Dict, Any, Tuple, Optional, Callable, List | |
import jwt | |
import requests | |
from cachetools import TTLCache, cached | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers | |
from flask import Flask, request, Response | |
from jwt import InvalidTokenError, InvalidIssuerError | |
from requests import HTTPError | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
class Config: | |
GLOBAL_AUDIENCE = "" | |
AUTHORIZED_ISSUERS = {} | |
DEFAULT_RESOURCE = "" | |
ALGORITHM = "RS256" | |
JWT_OPTIONS = {} | |
RESOURCE_CHECKS: Dict[str, List] = {"default": ["group_membership"]} | |
NO_VERIFY = False | |
NO_AUTHORIZE = False | |
REALM = "tokens" | |
WWW_AUTHENTICATE = "Bearer" | |
WEBDAV_SERVICE_PATH = "" | |
WEBDAV_AUTHORIZER = "sudo" | |
GROUP_DEPLOYMENT_PREFIX = "lsst_" | |
@staticmethod | |
def load(fname): | |
global logger | |
logger.info("Loading configuration from %s" % fname) | |
cp = configparser.ConfigParser() | |
try: | |
with open(fname, "r") as fp: | |
cp.read_file(fp) | |
except IOError as ie: | |
if ie.errno == errno.ENOENT: | |
return | |
raise | |
# Logging | |
if "loglevel" in cp.options("Global"): | |
level = cp.get("Global", "loglevel") | |
logger.info(f"Reconfiguring log, level={level}") | |
# Reconfigure logging | |
for handler in logging.root.handlers[:]: | |
logging.root.removeHandler(handler) | |
logging.basicConfig(level=level) | |
logger = logging.getLogger(__name__) | |
if level == "DEBUG": | |
logging.getLogger('werkzeug').setLevel(level) | |
# Globals | |
if 'audience_json' in cp.options("Global"): | |
# Read in the audience as json. Hopefully it's in list format or a string | |
Config.GLOBAL_AUDIENCE = json.loads(cp.get("Global", "audience_json")) | |
elif 'audience' in cp.options("Global"): | |
Config.GLOBAL_AUDIENCE = cp.get("Global", "audience") | |
if ',' in Config.GLOBAL_AUDIENCE: | |
# Split the audience list | |
Config.GLOBAL_AUDIENCE = re.split("\s*,\s*", Config.GLOBAL_AUDIENCE) | |
if 'default_resource' in cp.options("Global"): | |
Config.DEFAULT_RESOURCE = cp.get("Global", "default_resource") | |
if 'realm' in cp.options("Global"): | |
Config.REALM = cp.get("Global", "realm") | |
logger.info(f"Configured realm {Config.REALM}") | |
if 'www_authenticate' in cp.options("Global"): | |
Config.WWW_AUTHENTICATE = cp.get("Global", "www_authenticate") | |
logger.info(f"Configured WWW-Authenticate type: {Config.WWW_AUTHENTICATE}") | |
if "no_verify" in cp.options("Global"): | |
Config.NO_VERIFY = cp.getboolean("Global", "no_verify") | |
logger.warning("Authentication verification is disabled") | |
if "no_authorize" in cp.options("Global"): | |
Config.NO_AUTHORIZE = cp.getboolean("Global", "no_authorize") | |
logger.warning("Authorization is disabled") | |
if 'webdav_service_path' in cp.options("Global"): | |
webdav_service_path = cp.get("Global", "webdav_service_path") | |
Config.WEBDAV_SERVICE_PATH = webdav_service_path | |
logger.info(f"Configured WebDAV service path as: {webdav_service_path}") | |
else: | |
logger.warning("No WebDAV service path defined for application") | |
if 'webdav_authorizer' in cp.options("Global"): | |
authorizer = cp.get("Global", "webdav_authorizer") | |
if authorizer not in WEBDAV_AUTHORIZERS: | |
raise Exception("No Valid WebDAV authorizer found") | |
Config.WEBDAV_AUTHORIZER = authorizer | |
logger.info(f"Configured WebDAV authorizer: {authorizer}") | |
if 'group_deployment_prefix' in cp.options("Global"): | |
prefix = cp.get("Global", "group_deployment_prefix") | |
Config.GROUP_DEPLOYMENT_PREFIX = prefix | |
logger.info(f"Configured Group Deployment Prefix: {prefix}") | |
# Find JWT options | |
for option_name in cp.options("Global"): | |
if option_name.startswith("jwt_"): | |
key = option_name[len("jwt_"):] | |
value = cp.get("Global", option_name) | |
Config.JWT_OPTIONS[key] = value | |
# Find Resource Check Callables | |
for option_name in cp.options("Global"): | |
if option_name.startswith("resource_checks_"): | |
key = option_name[len("resource_checks_"):] | |
values = json.loads(cp.get("Global", option_name)) | |
if not isinstance(values, list): | |
raise Exception("Resource checks not a list:") | |
for callable_name in values: | |
if callable_name not in CHECK_ACCESS_CALLABLES: | |
raise Exception(f"No access checker for id {callable_name}") | |
Config.RESOURCE_CHECKS[key] = values | |
for resource, callables in Config.RESOURCE_CHECKS.items(): | |
logger.info(f"Configured resource checks: {resource} - {callables}") | |
# Sections | |
for section in cp.sections(): | |
logger.debug(f"Processing Section {section}") | |
if not section.lower().startswith("issuer "): | |
continue | |
if 'issuer' not in cp.options(section): | |
logger.warning(f"Ignore section {section} as it has no `issuer`") | |
continue | |
issuer = cp.get(section, 'issuer') | |
issuer_info = Config.AUTHORIZED_ISSUERS.setdefault(issuer, {}) | |
if 'map_subject' in cp.options(section): | |
issuer_info['map_subject'] = cp.getboolean(section, 'map_subject') | |
logger.info(f"Configured token access for {section} (issuer {issuer}): {issuer_info}") | |
logger.info("Configured Issuers") | |
@app.route('/auth') | |
def flask_listener(): | |
""" | |
Authenticate and authorize a token. | |
""" | |
# Default to Server Error for safety, so we must always set it to 200 | |
# if it's okay. | |
response = Response(status=500) | |
if 'Authorization' not in request.headers and "x-oauth-basic" not in request.cookies: | |
response = _needs_authentication(response, "No Authorization header", "") | |
return response | |
auth_type, auth_blob = request.headers['Authorization'].split(" ") | |
encoded_token = None | |
if "x-oauth-basic" in request.cookies: | |
encoded_token = request.cookies["x-oauth-basic"] | |
elif auth_type.lower() == "bearer": | |
encoded_token = auth_blob | |
elif auth_type.lower() == "basic": | |
logger.debug("Using OAuth with Basic") | |
# We fallback to user:token. We ignore the user. | |
# The Token is in the password | |
encoded_basic_auth = auth_blob | |
basic_auth = base64.b64decode(encoded_basic_auth) | |
user, password = basic_auth.strip().split(b":") | |
if password == "x-oauth-basic": | |
# Recommended default | |
encoded_token = user | |
elif user == "x-oauth-basic": | |
# ... Could be this though | |
logger.warning("Protocol `x-oauth-basic` should be in password field") | |
encoded_token = password | |
else: | |
logger.info("No protocol for token specified") | |
encoded_token = user | |
# Convert the token | |
# Send a 401 error code if there is any problem | |
try: | |
unverified_header = jwt.get_unverified_header(encoded_token) | |
unverified_token = jwt.decode(encoded_token, verify=False) | |
except InvalidTokenError as e: | |
response = _needs_authentication(response, "Invalid Token", str(e)) | |
logger.exception("Failed to decode Token") | |
logger.exception(e) | |
return response | |
try: | |
if Config.NO_VERIFY: | |
logger.debug("Skipping Verification of the token") | |
verified_token = unverified_token | |
else: | |
iss = unverified_token['iss'] | |
if iss not in Config.AUTHORIZED_ISSUERS: | |
raise InvalidIssuerError(f"Unauthorized Issuer: {iss}") | |
key = get_key_as_pem(unverified_token["iss"], unverified_header["kid"]) | |
verified_token = jwt.decode(encoded_token, | |
key, | |
algorithm=Config.ALGORITHM, | |
audience=Config.GLOBAL_AUDIENCE, | |
options=Config.JWT_OPTIONS) | |
except Exception as e: | |
response = _needs_authentication(response, "Invalid Token", str(e)) | |
logger.exception("Failed to deserialize Token") | |
logger.exception(e) | |
return response | |
if "Basic" in request.headers["Authorization"] and "x-oauth-basic" not in request.cookies: | |
response.set_cookie("x-oauth-basic", encoded_token) | |
if Config.NO_AUTHORIZE: | |
response.set_data("Authorization is Ok") | |
response.status_code = 200 | |
return response | |
# Authorization Checks | |
request_method = request.headers.get('X-Original-Method') | |
request_path = request.headers.get('X-Original-URI') | |
resource = request.args.get("resource") or Config.DEFAULT_RESOURCE | |
capability = request.args.get("capability") or get_capability(resource, request_method, | |
request_path) | |
assert capability is not None, "ERROR: Check nginx configuration for this resource" | |
jti = str(verified_token['jti']) if "jti" in verified_token else None | |
(success, message) = check_authorization(capability, request_method, request_path, | |
verified_token) | |
response.set_data(message) | |
if success: | |
response.status_code = 200 | |
if jti: | |
logger.info(f"Allowed token with Token ID: {jti}") | |
return response | |
if jti: | |
logger.error(f"Failed to authenticate Token ID {jti} because {message}") | |
else: | |
logger.error(f"Failed to authenticate Token because {message}") | |
response.status_code = 403 | |
return response | |
def _needs_authentication(response: Response, error: str, message: str) -> Response: | |
"""Modify request for a 401 as appropriate""" | |
response.status_code = 401 | |
response.set_data(error) | |
if not Config.WWW_AUTHENTICATE: | |
return response | |
if Config.WWW_AUTHENTICATE.lower() == "basic": | |
# Otherwise, send Bearer | |
response.headers['WWW-Authenticate'] = \ | |
f'Basic realm="{Config.REALM}"' | |
else: | |
response.headers['WWW-Authenticate'] = \ | |
f'Bearer realm="{Config.REALM}",error="{error}",error_description="{message}"' | |
return response | |
def get_capability(resource: str, request_method: str, request_path: str) -> Optional[str]: | |
""" | |
Get the capability for this request. | |
:param resource: Resource for the request | |
:param request_method: Original request method | |
:param request_path: Original request path | |
:return: A string if we were able to determin the capability, or None | |
""" | |
if resource == "workspace": | |
op = "" | |
if request_method in ["GET", "OPTIONS", "PROPFIND"]: | |
op = 'read' | |
elif request_method in ["PUT", "POST", "DELETE", "MKCOL", "COPY", "MOVE"]: | |
op = 'write' | |
return f"{op}:{resource}" | |
return None | |
def check_authorization(capability: str, request_method: str, request_path: str, | |
verified_token: Dict[str, Any]) -> Tuple[bool, str]: | |
""" | |
Check the authorization of the request based on the original method, | |
request path, and token. | |
:param capability: The capability we are authorizing | |
:param request_method: Original HTTP method | |
:param request_path: The Original request path | |
:param verified_token: The verified token | |
:rtype: Tuple[bool, str] | |
:returns: (True, message) with successful as True if the | |
all checks pass, otherwiss returns (False, message) | |
""" | |
(op, resource) = capability.split(":") | |
check_access_callables = get_check_access_functions(resource) | |
successes = [] | |
message = None | |
for check_access in check_access_callables: | |
(successful, message) = check_access(capability, request_method, request_path, | |
verified_token) | |
if not successful: | |
break | |
successes.append(successful) | |
success = sum(successes) == len(check_access_callables) | |
return success, message | |
def get_check_access_functions(resource: str) -> List[Callable]: | |
""" | |
Return the check access callable for a resource | |
:param resource: | |
:return: A callable for check access | |
""" | |
checker_names = Config.RESOURCE_CHECKS.get(resource) | |
if not checker_names: | |
checker_names = Config.RESOURCE_CHECKS.get("default") | |
callables = [] | |
for checker_name in checker_names: | |
callables.append(CHECK_ACCESS_CALLABLES[f"{checker_name}"]) | |
return callables | |
def scp_check_access(capability: str, request_method: str, request_path: str, | |
token: Dict[str, Any]) -> Tuple[bool, str]: | |
"""Check that a user has access with the following operation to this | |
service based on the assumption the token has a "scp" claim. | |
:param capability: The capability we are checking against | |
:param request_method: The operation requested for this service | |
:param request_path: The uri that will be tested | |
:param token: The token necessary | |
:rtype: Tuple[bool, str] | |
:returns: (successful, message) with successful as True if the | |
scitoken allows for op and the user can read/write the file, otherwise | |
return (False, message) | |
""" | |
capabilites = set(token.get("scp")) | |
if capability in capabilites: | |
return True, "Success" | |
return False, f"No capability found: {capability}" | |
def group_membership_check_access(capability: str, request_method: str, request_path: str, | |
token: Dict[str, Any]) -> Tuple[bool, str]: | |
"""Check that a user has access with the following operation to this service | |
based on some form of group membership. | |
:param capability: The capability we are checking against | |
:param request_method: The operation requested for this service | |
:param request_path: The uri that will be tested | |
:param token: The token necessary | |
:rtype: Tuple[bool, str] | |
:returns: (successful, message) with successful as True if the | |
scitoken allows for op and the user can read/write the file, otherwise | |
return (False, message) | |
""" | |
user_groups = token.get("isMemberOf") | |
capability_group = _group_membership_get_group(capability) | |
if capability_group in user_groups: | |
return True, "Success" | |
return False, "No Capability group found in user's `isMemberOfGroups`" | |
def _group_membership_get_group(capability: str) -> str: | |
""" | |
Given a capability, find a group that represents this capability. | |
:param capability: The capability in question | |
:return: A string value of the group for this capability. | |
""" | |
(op, resource) = capability.split(":") | |
prefix = RESOURCE_TO_ABSTRACT_GROUP_PREFIX[resource] | |
postfix = OP_TO_ABSTRACT_GROUP_POSTFIX[op] | |
abstract_group = f"{prefix}{postfix}" | |
abstract_group = None | |
if capability == "exec:portal": | |
abstract_group = "portal_r" | |
elif capability == "exec:notebook": | |
abstract_group = "nb_x" | |
group = f"{Config.GROUP_DEPLOYMENT_PREFIX}{abstract_group}" | |
return group | |
def webdav_check_access(capability: str, request_method: str, request_path: str, | |
token: Dict[str, Any]) -> Tuple[bool, str]: | |
"""Check that a user has access with the following operation to this service | |
and the file path in question for a WebDAV service. | |
:param capability: The capability we are checking against | |
:param request_method: The operation requested for this service | |
:param request_path: The uri that will be tested | |
:param token: The token necessary | |
:returns: (successful, message) with successful as True if the | |
scitoken allows for op and the user can read/write the file, otherwise | |
return (False, message) | |
""" | |
# Check Impersonation Next | |
service_path = Config.WEBDAV_SERVICE_PATH | |
assert request_path.startswith(service_path), "ERROR: Nginx WebDAV misconfiguration" | |
# Now remove the base request_path so we just get the auth_path + request_path | |
filepath_on_disk = request_path.replace(service_path, "", 1) | |
(op, resource) = capability.split(":") | |
webdav_authorizer = WEBDAV_AUTHORIZERS[Config.WEBDAV_AUTHORIZER] | |
if webdav_authorizer(token, op, filepath_on_disk): | |
return True, "" | |
return False, "Path not allowed" | |
def webdav_null_authorizer(token: Dict[str, Any], op: str, filepath_on_disk: str) -> bool: | |
return True | |
def webdav_sudo_authorizer(token: Dict[str, Any], op: str, filepath_on_disk: str) -> bool: | |
test_option = "-w" if op == "write" else "-r" | |
params = ["sudo", "-u", token["sub"], "test", test_option, filepath_on_disk] | |
logger.debug("Executing Impersonation Test: " + " ".join(params)) | |
return_code = subprocess.call(params) | |
return return_code == 0 | |
RESOURCE_TO_ABSTRACT_GROUP_PREFIX = { | |
"image": "img", | |
"image/metadata": "img_md", | |
"tap": "tap", | |
"tap/efd": "tap_efd", | |
"tap/user": "tap_usr", | |
"tap/history": "tap_hist", | |
"workspace": "ws", | |
"workspace/user": "ws_usr", | |
"portal": "portal", | |
"notebook": "nb" | |
} | |
OP_TO_ABSTRACT_GROUP_POSTFIX = { | |
"read": "_r", | |
"write": "_w", | |
"exec": "_x" | |
} | |
WEBDAV_AUTHORIZERS = { | |
"none": webdav_null_authorizer, | |
"sudo": webdav_sudo_authorizer | |
} | |
CHECK_ACCESS_CALLABLES = { | |
"scp": scp_check_access, | |
"group_membership": group_membership_check_access, | |
"webdav": webdav_check_access | |
} | |
@cached(cache=TTLCache(maxsize=16, ttl=600)) | |
def get_key_as_pem(request_issuer, request_key_id): | |
def _base64_to_long(data): | |
data = data.encode('ascii') | |
decoded = base64.urlsafe_b64decode(bytes(data) + b'==') | |
unpacked = struct.unpack('%sB' % len(decoded), decoded) | |
key_as_long = int(''.join(['{:02x}'.format(b) for b in unpacked]), 16) | |
return key_as_long | |
def _convert(exponent, modulus): | |
components = RSAPublicNumbers(e, m) | |
pub = components.public_key(backend=default_backend()) | |
return pub.public_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PublicFormat.SubjectPublicKeyInfo) | |
pem_keys = {} | |
for issuer in Config.AUTHORIZED_ISSUERS: | |
oidc_config = os.path.join(issuer, ".well-known/openid-configuration") | |
try: | |
oidc_resp = requests.get(oidc_config) | |
oidc_resp.raise_for_status() | |
jwks_uri = oidc_resp.json()["jwks_uri"] | |
keys_resp = requests.get(jwks_uri) | |
keys_resp.raise_for_status() | |
keys = keys_resp.json()["keys"] | |
for key in keys: | |
kid = key["kid"] | |
if key["alg"] == Config.ALGORITHM: | |
e = _base64_to_long(key['e']) | |
m = _base64_to_long(key['n']) | |
pem_keys.setdefault(issuer, {})[kid] = _convert(e, m) | |
except (KeyError, HTTPError) as e: | |
logger.error(f"Unable to store key for issuer: {issuer} ") | |
logger.error(e) | |
raise e | |
return pem_keys[request_issuer][request_key_id] | |
def configure(): | |
parser = argparse.ArgumentParser(description='Authenticate HTTP Requests') | |
parser.add_argument('-c', '--config', dest='config', type=str, | |
default="/etc/authorizer.cfg", | |
help="Location of the configuration file") | |
args = parser.parse_args() | |
# Read in configuration | |
Config.load(args.config) | |
configure() | |
def main(): | |
# Set up listener for events | |
app.run(host='localhost', port=8080) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment