Created
January 6, 2014 06:08
-
-
Save siteshen/8278913 to your computer and use it in GitHub Desktop.
A Mixin for Tornado and CAS client specification.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from urllib import urlencode, urlopen | |
| import json | |
| from tornado.web import RequestHandler | |
| class CasClientMixin(object): | |
| @property | |
| def cas_server_url(self): | |
| return 'http://www.example.com/cas' | |
| @property | |
| def service_url(self): | |
| return self.request.full_url() | |
| def get_next_url(self, default='/'): | |
| return self.get_argument('next', default) | |
| def get_login_url(self): | |
| params = {'service': self.service_url} | |
| return '%s/login?%s' % (self.cas_server_url, urlencode(params)) | |
| def get_logout_url(self, next_page=None): | |
| url = '%s/logout' % self.cas_server_url | |
| next_page = next_page or self.get_next_url() | |
| if next_page: | |
| params = { | |
| 'url': "%s://%s%s" % ( | |
| self.request.protocol, self.request.host, | |
| self.request.uri, | |
| ), | |
| } | |
| url += '?' + urlencode(params) | |
| return url | |
| def verify_cas(self, *args, **kwargs): | |
| return self._verify_cas3(*args, **kwargs) | |
| # https://bitbucket.org/cpcc/django-cas/src/default/django_cas/backends.py | |
| def _verify_cas3(self, ticket, service): | |
| """Verifies CAS 3.0+ XML-based authentication ticket and returns extended attributes. | |
| Returns username on success and None on failure. | |
| """ | |
| try: | |
| from xml.etree import ElementTree | |
| except ImportError: | |
| from elementtree import ElementTree | |
| params = {'ticket': ticket, 'service': service} | |
| url = '%s/proxyValidate?%s' % (self.cas_server_url, urlencode(params)) | |
| page = urlopen(url) | |
| try: | |
| user = None | |
| attributes = {} | |
| response = page.read() | |
| tree = ElementTree.fromstring(response) | |
| if tree[0].tag.endswith('authenticationSuccess'): | |
| for element in tree[0]: | |
| if element.tag.endswith('user'): | |
| user = element.text | |
| elif element.tag.endswith('attributes'): | |
| for attribute in element: | |
| attributes[attribute.tag.split("}").pop()] = attribute.text | |
| return user, attributes | |
| finally: | |
| page.close() | |
| # Example Usage: BaseHandler, LoginHandler, LogoutHandler | |
| class BaseHandler(RequestHandler): | |
| def get_current_user(self): | |
| attributes = self.get_secure_cookie('cas_attributes') | |
| if attributes: | |
| return json.loads(attributes) | |
| class LoginHandler(CasClientMixin, BaseHandler): | |
| def get(self): | |
| if self.current_user: | |
| return self.write({'message': 'login success'}) | |
| ticket = self.get_argument('ticket', None) | |
| if ticket: | |
| username, attributes = self.verify_cas(ticket, self.service_url) | |
| self.set_secure_cookie('cas_attributes', json.dumps(attributes)) | |
| return self.redirect('/') | |
| else: | |
| return self.redirect(self.get_login_url()) | |
| class LogoutHandler(CasClientMixin, BaseHandler): | |
| def get(self): | |
| if not self.current_user: | |
| return self.redirect(self.get_next_url()) | |
| else: | |
| self.clear_cookie('cas_attributes') | |
| return self.redirect(self.get_logout_url('/logout')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment