Skip to content

Instantly share code, notes, and snippets.

@om2c0de
Created July 17, 2020 09:25
Show Gist options
  • Save om2c0de/11588a8ba142b4f7822293bd932b5aad to your computer and use it in GitHub Desktop.
Save om2c0de/11588a8ba142b4f7822293bd932b5aad to your computer and use it in GitHub Desktop.
Kerberos + JWT + Basic + Negotiate authentications using tornado web framework.
import base64
import json
import logging
import os
import sys
from abc import ABC
import jwt
import tornado.escape
import tornado.httpserver
import tornado.ioloop
import tornado.web
# Platform-specific Kerberos requirements
if sys.platform == 'win32':
import kerberos_sspi as kerberos
import pywintypes
pywintypes_error = pywintypes.error
else:
import kerberos
pywintypes_error = OSError
# Initialize logger
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
class HeadersAlreadyWrittenException(Exception):
pass
class BaseKerberosAuthHandler(tornado.web.RequestHandler, ABC):
"""
Authenticates users via Kerberos-based Single Sign-On. Requires that you
define 'sso_realm' and 'sso_service' in your Tornado Application settings.
For example::
settings = dict(
cookie_secret='iYR123qg4UUdsgf4CRung6BFUBhizAciid8oq1YfJR3gN',
static_path=os.path.join(os.path.dirname(__file__), 'static'),
gzip=True,
login_url='/auth',
debug=True,
sso_realm='EXAMPLE.COM',
sso_service='HTTP' # Should pretty much always be HTTP
)
NOTE: If you're using 'HTTP' as the service it must be in all caps or it
might not work with some browsers/clients (which auto-capitalize all
services).
"""
def initialize(self):
"""
Print out helpful error messages if the requisite settings aren't
configured.
NOTE: It won't hurt anything to override this method in your
RequestHandler.
"""
self.require_setting('sso_realm', 'Kerberos/GSSAPI Single Sign-On')
self.require_setting('sso_service', 'Kerberos/GSSAPI Single Sign-On')
self.require_setting('jwt_secret_key', 'JWT secret server key')
def get_authenticated_user(self, callback):
"""
Processes the client's Authorization header and calls
self.auth_negotiate() or self.auth_basic() depending on what headers
were provided by the client.
"""
keytab = self.settings.get('sso_keytab', None)
if keytab:
# The kerberos module does not take a keytab as a parameter when
# performing authentication but you can still specify it via an
# environment variable:
os.environ['KRB5_KTNAME'] = keytab
auth_header = self.request.headers.get('Authorization', None)
if auth_header and auth_header.startswith('Negotiate'):
self.auth_negotiate(auth_header, callback)
elif auth_header and auth_header.startswith('Basic'):
self.auth_basic(auth_header, callback)
elif auth_header and auth_header.startswith('Bearer'):
self.auth_jwt(auth_header, callback)
def auth_negotiate(self, auth_header, callback):
"""
Perform Negotiate (GSSAPI/SSO) authentication via Kerberos.
"""
auth_str = auth_header.split()[1]
# Initialize Kerberos Context
context = None
try:
result, context = kerberos.authGSSServerInit(self.settings['sso_service'])
if result is not kerberos.AUTH_GSS_COMPLETE:
raise tornado.web.HTTPError(500, 'Kerberos Init failed')
result = kerberos.authGSSServerStep(context, auth_str)
if result is kerberos.AUTH_GSS_COMPLETE:
gss_string = kerberos.authGSSServerResponse(context)
self.set_header('WWW-Authenticate', f'Negotiate {gss_string}')
else: # Fall back to Basic auth
self.auth_basic(auth_header, callback)
# NOTE: The user we get from Negotiate is a full UPN (user@REALM)
user = kerberos.authGSSServerUserName(context)
except (kerberos.GSSError, pywintypes_error) as e:
logging.error(f'Kerberos Error: {e}')
raise tornado.web.HTTPError(500, 'Kerberos Init failed')
finally:
if context:
kerberos.authGSSServerClean(context)
callback(user)
def auth_basic(self, auth_header, callback):
"""
Perform Basic authentication using Kerberos against
`self.settings['sso_realm']`.
"""
auth_decoded = base64.b64decode(auth_header[6:]).decode('utf-8')
username, password = auth_decoded.split(':', 1)
try:
kerberos.checkPassword(username, password, self.settings['sso_service'], self.settings['sso_realm'])
except Exception as e: # Basic auth failed
if self.settings['debug']:
print(e) # Very useful for debugging Kerberos errors
return self.authenticate_redirect()
# NOTE: Basic auth just gives us the username without the @REALM part
# so we have to add it:
user = f'{username}@{self.settings["sso_realm"]}'
callback(user)
def auth_jwt(self, auth_header, callback):
jwt_token = auth_header.split()[1]
try:
user_data = jwt.decode(jwt_token, self.settings['jwt_secret_key'], algorithms=['HS256'])
username = user_data['username']
password = user_data['password']
kerberos.checkPassword(username, password, self.settings['sso_service'], self.settings['sso_realm'])
except Exception as e:
logging.error(f'JWT Error: {e}')
raise tornado.web.HTTPError(401, 'Invalid authentication token.')
user = f'{username}@{self.settings["sso_realm"]}'
callback(user)
def authenticate_redirect(self):
"""
Informs the browser that this resource requires authentication (status
code 401) which should prompt the browser to reply with credentials.
The browser will be informed that we support both Negotiate (GSSAPI/SSO)
and Basic auth.
"""
# NOTE: I know this isn't technically a redirect but I wanted to make
# this process as close as possible to how things work in tornado.auth.
if self._headers_written:
raise HeadersAlreadyWrittenException
self.set_status(401)
self.add_header("WWW-Authenticate", "Negotiate")
self.add_header("WWW-Authenticate", f'Basic realm="{self.settings["realm"]}"')
self.add_header("WWW-Authenticate", f'Bearer realm="{self.settings["realm"]}"')
self.finish()
return False
class KerberosAuthHandler(BaseKerberosAuthHandler, ABC):
def get(self):
"""
Provides Basic, Negotiate or JWT authentication for the clients.
"""
auth_header = self.request.headers.get('Authorization', None)
if auth_header is None:
self.authenticate_redirect()
self.get_authenticated_user(self._on_auth)
self.write(f'Auth header is: {auth_header}')
def post(self):
"""
Provides JWT token obtaining for the clients.
"""
username = self.get_argument('username')
password = self.get_argument('password')
jwt_token = self.obtain_jwt_token(username, password)
jwt_data = {'token': jwt_token}
self.write(json.dumps(jwt_data))
def get_current_user(self):
return self.get_secure_cookie('user')
def obtain_jwt_token(self, username, password):
payload = {'username': username, 'password': password}
jwt_token = jwt.encode(payload, self.settings['jwt_secret_key'], algorithm='HS256')
return jwt_token.decode('utf-8')
def _on_auth(self, user):
if user is None:
raise tornado.web.HTTPError(500, 'Kerberos auth failed')
self.set_secure_cookie('user', tornado.escape.json_encode(user))
# Disable redirection for now
# next_url = self.get_argument('next', '/') # To redirect properly
# self.redirect(next_url)
class ProtectedUserHandler(KerberosAuthHandler, ABC):
@tornado.web.authenticated
def get(self):
# NOTE: Auth gives us the username with the @REALM part, so we can just cut it
username, _ = self.current_user.decode("utf-8").split('@')
self.write(f'Hello, {username}! It is your personal endpoint.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment