Skip to content

Instantly share code, notes, and snippets.

@brianv0
Created December 12, 2018 23:39
Show Gist options
  • Save brianv0/c6a5a75d3fc5c92ad9142e6bc33a134a to your computer and use it in GitHub Desktop.
Save brianv0/c6a5a75d3fc5c92ad9142e6bc33a134a to your computer and use it in GitHub Desktop.
Authorizer
import argparse
import base64
import configparser
import errno
import json
import logging
import os
import re
import struct
import subprocess
from typing import Dict, Any, Tuple, Optional, Callable, List
import jwt
import requests
from cachetools import TTLCache, cached
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from flask import Flask, request, Response
from jwt import InvalidTokenError, InvalidIssuerError
from requests import HTTPError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
class Config:
GLOBAL_AUDIENCE = ""
AUTHORIZED_ISSUERS = {}
DEFAULT_RESOURCE = ""
ALGORITHM = "RS256"
JWT_OPTIONS = {}
RESOURCE_CHECKS: Dict[str, List] = {"default": ["group_membership"]}
NO_VERIFY = False
NO_AUTHORIZE = False
REALM = "tokens"
WWW_AUTHENTICATE = "Bearer"
WEBDAV_SERVICE_PATH = ""
WEBDAV_AUTHORIZER = "sudo"
GROUP_DEPLOYMENT_PREFIX = "lsst_"
@staticmethod
def load(fname):
global logger
logger.info("Loading configuration from %s" % fname)
cp = configparser.ConfigParser()
try:
with open(fname, "r") as fp:
cp.read_file(fp)
except IOError as ie:
if ie.errno == errno.ENOENT:
return
raise
# Logging
if "loglevel" in cp.options("Global"):
level = cp.get("Global", "loglevel")
logger.info(f"Reconfiguring log, level={level}")
# Reconfigure logging
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(level=level)
logger = logging.getLogger(__name__)
if level == "DEBUG":
logging.getLogger('werkzeug').setLevel(level)
# Globals
if 'audience_json' in cp.options("Global"):
# Read in the audience as json. Hopefully it's in list format or a string
Config.GLOBAL_AUDIENCE = json.loads(cp.get("Global", "audience_json"))
elif 'audience' in cp.options("Global"):
Config.GLOBAL_AUDIENCE = cp.get("Global", "audience")
if ',' in Config.GLOBAL_AUDIENCE:
# Split the audience list
Config.GLOBAL_AUDIENCE = re.split("\s*,\s*", Config.GLOBAL_AUDIENCE)
if 'default_resource' in cp.options("Global"):
Config.DEFAULT_RESOURCE = cp.get("Global", "default_resource")
if 'realm' in cp.options("Global"):
Config.REALM = cp.get("Global", "realm")
logger.info(f"Configured realm {Config.REALM}")
if 'www_authenticate' in cp.options("Global"):
Config.WWW_AUTHENTICATE = cp.get("Global", "www_authenticate")
logger.info(f"Configured WWW-Authenticate type: {Config.WWW_AUTHENTICATE}")
if "no_verify" in cp.options("Global"):
Config.NO_VERIFY = cp.getboolean("Global", "no_verify")
logger.warning("Authentication verification is disabled")
if "no_authorize" in cp.options("Global"):
Config.NO_AUTHORIZE = cp.getboolean("Global", "no_authorize")
logger.warning("Authorization is disabled")
if 'webdav_service_path' in cp.options("Global"):
webdav_service_path = cp.get("Global", "webdav_service_path")
Config.WEBDAV_SERVICE_PATH = webdav_service_path
logger.info(f"Configured WebDAV service path as: {webdav_service_path}")
else:
logger.warning("No WebDAV service path defined for application")
if 'webdav_authorizer' in cp.options("Global"):
authorizer = cp.get("Global", "webdav_authorizer")
if authorizer not in WEBDAV_AUTHORIZERS:
raise Exception("No Valid WebDAV authorizer found")
Config.WEBDAV_AUTHORIZER = authorizer
logger.info(f"Configured WebDAV authorizer: {authorizer}")
if 'group_deployment_prefix' in cp.options("Global"):
prefix = cp.get("Global", "group_deployment_prefix")
Config.GROUP_DEPLOYMENT_PREFIX = prefix
logger.info(f"Configured Group Deployment Prefix: {prefix}")
# Find JWT options
for option_name in cp.options("Global"):
if option_name.startswith("jwt_"):
key = option_name[len("jwt_"):]
value = cp.get("Global", option_name)
Config.JWT_OPTIONS[key] = value
# Find Resource Check Callables
for option_name in cp.options("Global"):
if option_name.startswith("resource_checks_"):
key = option_name[len("resource_checks_"):]
values = json.loads(cp.get("Global", option_name))
if not isinstance(values, list):
raise Exception("Resource checks not a list:")
for callable_name in values:
if callable_name not in CHECK_ACCESS_CALLABLES:
raise Exception(f"No access checker for id {callable_name}")
Config.RESOURCE_CHECKS[key] = values
for resource, callables in Config.RESOURCE_CHECKS.items():
logger.info(f"Configured resource checks: {resource} - {callables}")
# Sections
for section in cp.sections():
logger.debug(f"Processing Section {section}")
if not section.lower().startswith("issuer "):
continue
if 'issuer' not in cp.options(section):
logger.warning(f"Ignore section {section} as it has no `issuer`")
continue
issuer = cp.get(section, 'issuer')
issuer_info = Config.AUTHORIZED_ISSUERS.setdefault(issuer, {})
if 'map_subject' in cp.options(section):
issuer_info['map_subject'] = cp.getboolean(section, 'map_subject')
logger.info(f"Configured token access for {section} (issuer {issuer}): {issuer_info}")
logger.info("Configured Issuers")
@app.route('/auth')
def flask_listener():
"""
Authenticate and authorize a token.
"""
# Default to Server Error for safety, so we must always set it to 200
# if it's okay.
response = Response(status=500)
if 'Authorization' not in request.headers and "x-oauth-basic" not in request.cookies:
response = _needs_authentication(response, "No Authorization header", "")
return response
auth_type, auth_blob = request.headers['Authorization'].split(" ")
encoded_token = None
if "x-oauth-basic" in request.cookies:
encoded_token = request.cookies["x-oauth-basic"]
elif auth_type.lower() == "bearer":
encoded_token = auth_blob
elif auth_type.lower() == "basic":
logger.debug("Using OAuth with Basic")
# We fallback to user:token. We ignore the user.
# The Token is in the password
encoded_basic_auth = auth_blob
basic_auth = base64.b64decode(encoded_basic_auth)
user, password = basic_auth.strip().split(b":")
if password == "x-oauth-basic":
# Recommended default
encoded_token = user
elif user == "x-oauth-basic":
# ... Could be this though
logger.warning("Protocol `x-oauth-basic` should be in password field")
encoded_token = password
else:
logger.info("No protocol for token specified")
encoded_token = user
# Convert the token
# Send a 401 error code if there is any problem
try:
unverified_header = jwt.get_unverified_header(encoded_token)
unverified_token = jwt.decode(encoded_token, verify=False)
except InvalidTokenError as e:
response = _needs_authentication(response, "Invalid Token", str(e))
logger.exception("Failed to decode Token")
logger.exception(e)
return response
try:
if Config.NO_VERIFY:
logger.debug("Skipping Verification of the token")
verified_token = unverified_token
else:
iss = unverified_token['iss']
if iss not in Config.AUTHORIZED_ISSUERS:
raise InvalidIssuerError(f"Unauthorized Issuer: {iss}")
key = get_key_as_pem(unverified_token["iss"], unverified_header["kid"])
verified_token = jwt.decode(encoded_token,
key,
algorithm=Config.ALGORITHM,
audience=Config.GLOBAL_AUDIENCE,
options=Config.JWT_OPTIONS)
except Exception as e:
response = _needs_authentication(response, "Invalid Token", str(e))
logger.exception("Failed to deserialize Token")
logger.exception(e)
return response
if "Basic" in request.headers["Authorization"] and "x-oauth-basic" not in request.cookies:
response.set_cookie("x-oauth-basic", encoded_token)
if Config.NO_AUTHORIZE:
response.set_data("Authorization is Ok")
response.status_code = 200
return response
# Authorization Checks
request_method = request.headers.get('X-Original-Method')
request_path = request.headers.get('X-Original-URI')
resource = request.args.get("resource") or Config.DEFAULT_RESOURCE
capability = request.args.get("capability") or get_capability(resource, request_method,
request_path)
assert capability is not None, "ERROR: Check nginx configuration for this resource"
jti = str(verified_token['jti']) if "jti" in verified_token else None
(success, message) = check_authorization(capability, request_method, request_path,
verified_token)
response.set_data(message)
if success:
response.status_code = 200
if jti:
logger.info(f"Allowed token with Token ID: {jti}")
return response
if jti:
logger.error(f"Failed to authenticate Token ID {jti} because {message}")
else:
logger.error(f"Failed to authenticate Token because {message}")
response.status_code = 403
return response
def _needs_authentication(response: Response, error: str, message: str) -> Response:
"""Modify request for a 401 as appropriate"""
response.status_code = 401
response.set_data(error)
if not Config.WWW_AUTHENTICATE:
return response
if Config.WWW_AUTHENTICATE.lower() == "basic":
# Otherwise, send Bearer
response.headers['WWW-Authenticate'] = \
f'Basic realm="{Config.REALM}"'
else:
response.headers['WWW-Authenticate'] = \
f'Bearer realm="{Config.REALM}",error="{error}",error_description="{message}"'
return response
def get_capability(resource: str, request_method: str, request_path: str) -> Optional[str]:
"""
Get the capability for this request.
:param resource: Resource for the request
:param request_method: Original request method
:param request_path: Original request path
:return: A string if we were able to determin the capability, or None
"""
if resource == "workspace":
op = ""
if request_method in ["GET", "OPTIONS", "PROPFIND"]:
op = 'read'
elif request_method in ["PUT", "POST", "DELETE", "MKCOL", "COPY", "MOVE"]:
op = 'write'
return f"{op}:{resource}"
return None
def check_authorization(capability: str, request_method: str, request_path: str,
verified_token: Dict[str, Any]) -> Tuple[bool, str]:
"""
Check the authorization of the request based on the original method,
request path, and token.
:param capability: The capability we are authorizing
:param request_method: Original HTTP method
:param request_path: The Original request path
:param verified_token: The verified token
:rtype: Tuple[bool, str]
:returns: (True, message) with successful as True if the
all checks pass, otherwiss returns (False, message)
"""
(op, resource) = capability.split(":")
check_access_callables = get_check_access_functions(resource)
successes = []
message = None
for check_access in check_access_callables:
(successful, message) = check_access(capability, request_method, request_path,
verified_token)
if not successful:
break
successes.append(successful)
success = sum(successes) == len(check_access_callables)
return success, message
def get_check_access_functions(resource: str) -> List[Callable]:
"""
Return the check access callable for a resource
:param resource:
:return: A callable for check access
"""
checker_names = Config.RESOURCE_CHECKS.get(resource)
if not checker_names:
checker_names = Config.RESOURCE_CHECKS.get("default")
callables = []
for checker_name in checker_names:
callables.append(CHECK_ACCESS_CALLABLES[f"{checker_name}"])
return callables
def scp_check_access(capability: str, request_method: str, request_path: str,
token: Dict[str, Any]) -> Tuple[bool, str]:
"""Check that a user has access with the following operation to this
service based on the assumption the token has a "scp" claim.
:param capability: The capability we are checking against
:param request_method: The operation requested for this service
:param request_path: The uri that will be tested
:param token: The token necessary
:rtype: Tuple[bool, str]
:returns: (successful, message) with successful as True if the
scitoken allows for op and the user can read/write the file, otherwise
return (False, message)
"""
capabilites = set(token.get("scp"))
if capability in capabilites:
return True, "Success"
return False, f"No capability found: {capability}"
def group_membership_check_access(capability: str, request_method: str, request_path: str,
token: Dict[str, Any]) -> Tuple[bool, str]:
"""Check that a user has access with the following operation to this service
based on some form of group membership.
:param capability: The capability we are checking against
:param request_method: The operation requested for this service
:param request_path: The uri that will be tested
:param token: The token necessary
:rtype: Tuple[bool, str]
:returns: (successful, message) with successful as True if the
scitoken allows for op and the user can read/write the file, otherwise
return (False, message)
"""
user_groups = token.get("isMemberOf")
capability_group = _group_membership_get_group(capability)
if capability_group in user_groups:
return True, "Success"
return False, "No Capability group found in user's `isMemberOfGroups`"
def _group_membership_get_group(capability: str) -> str:
"""
Given a capability, find a group that represents this capability.
:param capability: The capability in question
:return: A string value of the group for this capability.
"""
(op, resource) = capability.split(":")
prefix = RESOURCE_TO_ABSTRACT_GROUP_PREFIX[resource]
postfix = OP_TO_ABSTRACT_GROUP_POSTFIX[op]
abstract_group = f"{prefix}{postfix}"
abstract_group = None
if capability == "exec:portal":
abstract_group = "portal_r"
elif capability == "exec:notebook":
abstract_group = "nb_x"
group = f"{Config.GROUP_DEPLOYMENT_PREFIX}{abstract_group}"
return group
def webdav_check_access(capability: str, request_method: str, request_path: str,
token: Dict[str, Any]) -> Tuple[bool, str]:
"""Check that a user has access with the following operation to this service
and the file path in question for a WebDAV service.
:param capability: The capability we are checking against
:param request_method: The operation requested for this service
:param request_path: The uri that will be tested
:param token: The token necessary
:returns: (successful, message) with successful as True if the
scitoken allows for op and the user can read/write the file, otherwise
return (False, message)
"""
# Check Impersonation Next
service_path = Config.WEBDAV_SERVICE_PATH
assert request_path.startswith(service_path), "ERROR: Nginx WebDAV misconfiguration"
# Now remove the base request_path so we just get the auth_path + request_path
filepath_on_disk = request_path.replace(service_path, "", 1)
(op, resource) = capability.split(":")
webdav_authorizer = WEBDAV_AUTHORIZERS[Config.WEBDAV_AUTHORIZER]
if webdav_authorizer(token, op, filepath_on_disk):
return True, ""
return False, "Path not allowed"
def webdav_null_authorizer(token: Dict[str, Any], op: str, filepath_on_disk: str) -> bool:
return True
def webdav_sudo_authorizer(token: Dict[str, Any], op: str, filepath_on_disk: str) -> bool:
test_option = "-w" if op == "write" else "-r"
params = ["sudo", "-u", token["sub"], "test", test_option, filepath_on_disk]
logger.debug("Executing Impersonation Test: " + " ".join(params))
return_code = subprocess.call(params)
return return_code == 0
RESOURCE_TO_ABSTRACT_GROUP_PREFIX = {
"image": "img",
"image/metadata": "img_md",
"tap": "tap",
"tap/efd": "tap_efd",
"tap/user": "tap_usr",
"tap/history": "tap_hist",
"workspace": "ws",
"workspace/user": "ws_usr",
"portal": "portal",
"notebook": "nb"
}
OP_TO_ABSTRACT_GROUP_POSTFIX = {
"read": "_r",
"write": "_w",
"exec": "_x"
}
WEBDAV_AUTHORIZERS = {
"none": webdav_null_authorizer,
"sudo": webdav_sudo_authorizer
}
CHECK_ACCESS_CALLABLES = {
"scp": scp_check_access,
"group_membership": group_membership_check_access,
"webdav": webdav_check_access
}
@cached(cache=TTLCache(maxsize=16, ttl=600))
def get_key_as_pem(request_issuer, request_key_id):
def _base64_to_long(data):
data = data.encode('ascii')
decoded = base64.urlsafe_b64decode(bytes(data) + b'==')
unpacked = struct.unpack('%sB' % len(decoded), decoded)
key_as_long = int(''.join(['{:02x}'.format(b) for b in unpacked]), 16)
return key_as_long
def _convert(exponent, modulus):
components = RSAPublicNumbers(e, m)
pub = components.public_key(backend=default_backend())
return pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
pem_keys = {}
for issuer in Config.AUTHORIZED_ISSUERS:
oidc_config = os.path.join(issuer, ".well-known/openid-configuration")
try:
oidc_resp = requests.get(oidc_config)
oidc_resp.raise_for_status()
jwks_uri = oidc_resp.json()["jwks_uri"]
keys_resp = requests.get(jwks_uri)
keys_resp.raise_for_status()
keys = keys_resp.json()["keys"]
for key in keys:
kid = key["kid"]
if key["alg"] == Config.ALGORITHM:
e = _base64_to_long(key['e'])
m = _base64_to_long(key['n'])
pem_keys.setdefault(issuer, {})[kid] = _convert(e, m)
except (KeyError, HTTPError) as e:
logger.error(f"Unable to store key for issuer: {issuer} ")
logger.error(e)
raise e
return pem_keys[request_issuer][request_key_id]
def configure():
parser = argparse.ArgumentParser(description='Authenticate HTTP Requests')
parser.add_argument('-c', '--config', dest='config', type=str,
default="/etc/authorizer.cfg",
help="Location of the configuration file")
args = parser.parse_args()
# Read in configuration
Config.load(args.config)
configure()
def main():
# Set up listener for events
app.run(host='localhost', port=8080)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment