Skip to content

Instantly share code, notes, and snippets.

@siteshen
Created January 6, 2014 06:08
Show Gist options
  • Select an option

  • Save siteshen/8278913 to your computer and use it in GitHub Desktop.

Select an option

Save siteshen/8278913 to your computer and use it in GitHub Desktop.
A Mixin for Tornado and CAS client specification.
from urllib import urlencode, urlopen
import json
from tornado.web import RequestHandler
class CasClientMixin(object):
@property
def cas_server_url(self):
return 'http://www.example.com/cas'
@property
def service_url(self):
return self.request.full_url()
def get_next_url(self, default='/'):
return self.get_argument('next', default)
def get_login_url(self):
params = {'service': self.service_url}
return '%s/login?%s' % (self.cas_server_url, urlencode(params))
def get_logout_url(self, next_page=None):
url = '%s/logout' % self.cas_server_url
next_page = next_page or self.get_next_url()
if next_page:
params = {
'url': "%s://%s%s" % (
self.request.protocol, self.request.host,
self.request.uri,
),
}
url += '?' + urlencode(params)
return url
def verify_cas(self, *args, **kwargs):
return self._verify_cas3(*args, **kwargs)
# https://bitbucket.org/cpcc/django-cas/src/default/django_cas/backends.py
def _verify_cas3(self, ticket, service):
"""Verifies CAS 3.0+ XML-based authentication ticket and returns extended attributes.
Returns username on success and None on failure.
"""
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
params = {'ticket': ticket, 'service': service}
url = '%s/proxyValidate?%s' % (self.cas_server_url, urlencode(params))
page = urlopen(url)
try:
user = None
attributes = {}
response = page.read()
tree = ElementTree.fromstring(response)
if tree[0].tag.endswith('authenticationSuccess'):
for element in tree[0]:
if element.tag.endswith('user'):
user = element.text
elif element.tag.endswith('attributes'):
for attribute in element:
attributes[attribute.tag.split("}").pop()] = attribute.text
return user, attributes
finally:
page.close()
# Example Usage: BaseHandler, LoginHandler, LogoutHandler
class BaseHandler(RequestHandler):
def get_current_user(self):
attributes = self.get_secure_cookie('cas_attributes')
if attributes:
return json.loads(attributes)
class LoginHandler(CasClientMixin, BaseHandler):
def get(self):
if self.current_user:
return self.write({'message': 'login success'})
ticket = self.get_argument('ticket', None)
if ticket:
username, attributes = self.verify_cas(ticket, self.service_url)
self.set_secure_cookie('cas_attributes', json.dumps(attributes))
return self.redirect('/')
else:
return self.redirect(self.get_login_url())
class LogoutHandler(CasClientMixin, BaseHandler):
def get(self):
if not self.current_user:
return self.redirect(self.get_next_url())
else:
self.clear_cookie('cas_attributes')
return self.redirect(self.get_logout_url('/logout'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment