Created
March 10, 2015 13:50
-
-
Save vincenting/aaa06ee9b4a384205b74 to your computer and use it in GitHub Desktop.
github oauth2 for tornado.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import functools | |
import urllib.parse as urllib_parse | |
from tornado.concurrent import return_future | |
from tornado.auth import _auth_return_future, AuthError | |
from tornado import httpclient | |
from tornado.httputil import url_concat | |
from tornado import escape | |
class GithubOAuth2Mixin(object): | |
_OAUTH_AUTHORIZE_URL = "https://github.com/login/oauth/authorize" | |
_OAUTH_ACCESS_TOKEN_URL = "https://github.com/login/oauth/access_token" | |
_OAUTH_USER_BASE_URL = "https://api.github.com" | |
_OAUTH_SETTINGS_KEY = "github_oauth" | |
@return_future | |
def authorize_redirect(self, scopes=None, response_type="code", callback=None, **kwargs): | |
args = { | |
"client_id": self.settings[self._OAUTH_SETTINGS_KEY]["key"], | |
"response_type": response_type | |
} | |
if kwargs: | |
args.update(kwargs) | |
if scopes: | |
args["scope"] = ",".join(scopes) | |
self.redirect( | |
url_concat(self._OAUTH_AUTHORIZE_URL, args)) | |
callback() | |
@_auth_return_future | |
def get_authenticated_user(self, code, callback): | |
"""Handles the login for the Github user, returning a user object. | |
Example usage:: | |
class GithubOAuth2LoginHandler(tornado.web.RequestHandler, | |
GithubOAuth2Mixin): | |
@tornado.gen.coroutine | |
def get(self): | |
if self.get_argument("code", False): | |
user = yield self.get_authenticated_user(code=self.get_argument("code")) | |
# Save the user with e.g. set_secure_cookie | |
else: | |
yield self.authorize_redirect(scope=["user:email"]) | |
""" | |
http = self.get_auth_http_client() | |
body = urllib_parse.urlencode({ | |
"code": code, | |
"client_id": self.settings[self._OAUTH_SETTINGS_KEY]["key"], | |
"client_secret": self.settings[self._OAUTH_SETTINGS_KEY]["secret"], | |
}) | |
http.fetch(self._OAUTH_ACCESS_TOKEN_URL, | |
functools.partial(self._on_access_token, callback), | |
method="POST", | |
headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"}, | |
body=body) | |
def _on_access_token(self, future, response): | |
"""Callback function for the exchange to the access token.""" | |
if response.error: | |
future.set_exception(AuthError("Github auth error: %s" % str(response))) | |
return | |
args = escape.json_decode(escape.native_str(response.body)) | |
access_token = args.get("access_token", None) | |
if not access_token: | |
future.set_result(None) | |
scopes = args["scope"].split(",") | |
has_user_email_scope = scopes.count("user:email") > 0 | |
self.github_request( | |
path="/user", | |
callback=functools.partial( | |
self._on_get_user_info, future, access_token, has_user_email_scope), | |
access_token=access_token | |
) | |
def _on_get_user_info(self, future, access_token, has_user_email_scope, user): | |
if user is None: | |
future.set_result(None) | |
return | |
user.update({"access_token": access_token}) | |
if not has_user_email_scope: | |
return future.set_result(user) | |
self.github_request( | |
path="/user/emails", | |
callback=functools.partial( | |
self._on_get_user_email, future, user), | |
access_token=access_token | |
) | |
@staticmethod | |
def _on_get_user_email(future, user, emails): | |
user.update({"private_emails": emails}) | |
future.set_result(user) | |
@_auth_return_future | |
def github_request(self, path, callback, access_token=None, post_args=None, **args): | |
url = self._OAUTH_USER_BASE_URL + path | |
all_args = {} | |
if access_token: | |
all_args["access_token"] = access_token | |
all_args.update(args) | |
if all_args: | |
url += "?" + urllib_parse.urlencode(all_args) | |
callback = functools.partial(self._on_github_request, callback) | |
http = self.get_auth_http_client() | |
ua = "tornado" | |
if post_args is not None: | |
http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args), | |
callback=callback, user_agent=ua, | |
headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"}) | |
else: | |
http.fetch(url, method="GET", callback=callback, user_agent=ua, headers={"Accept": "application/json"}) | |
@staticmethod | |
def _on_github_request(future, response): | |
if response.error: | |
future.set_exception(AuthError("Error response %s fetching %s" % | |
(response.error, response.request.url))) | |
return | |
future.set_result(escape.json_decode(response.body)) | |
@staticmethod | |
def get_auth_http_client(): | |
return httpclient.AsyncHTTPClient() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment