Last active
February 22, 2016 19:56
-
-
Save ysimonson/5877284 to your computer and use it in GitHub Desktop.
LinkedIn OAuth2 authentication over tornado
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tornado import auth, httpclient, httputil, escape | |
import urllib.request | |
import urllib.parse | |
import urllib.error | |
import functools | |
REQUEST_TIMEOUT = 20.0 | |
class LinkedInMixin(auth.OAuth2Mixin): | |
""" | |
LinkedIn authentication using OAuth2. | |
Example usage:: | |
class LinkedInLoginHandler(LoginHandler, LinkedInMixin): | |
@tornado.gen.coroutine | |
def get(self): | |
code = self.get_argument("code", None) | |
redirect_uri = "%s://%s%s" % (self.request.protocol, self.request.host, self.request.path) | |
if not code: | |
# Generate a random state | |
state = binascii.b2a_hex(os.urandom(15)) | |
self.set_secure_cookie("linkedin_state", state) | |
yield self.authorize_redirect( | |
redirect_uri=redirect_uri, | |
client_id=self.settings["linkedin_client_id"], | |
extra_params={ | |
"response_type": "code", | |
"state": state, | |
"scope": "r_basicprofile r_emailaddress" | |
} | |
) | |
return | |
# Validate the state | |
if self.get_argument("state", None) != self.get_secure_cookie("linkedin_state"): | |
raise tornado.web.HTTPError(400, "Invalid state") | |
user_data = yield self.get_authenticated_user( | |
redirect_uri=redirect_uri, | |
client_id=self.settings["linkedin_client_id"], | |
client_secret=self.settings["linkedin_client_secret"], | |
code=code, | |
extra_fields=["formatted-name", "email-address"] | |
) | |
if not user_data: | |
raise tornado.web.HTTPError(400, "LinkedIn authentication failed") | |
# Handle authenticated user | |
""" | |
_OAUTH_ACCESS_TOKEN_URL = "https://www.linkedin.com/uas/oauth2/accessToken?" | |
_OAUTH_AUTHORIZE_URL = "https://www.linkedin.com/uas/oauth2/authorization?" | |
_OAUTH_NO_CALLBACKS = False | |
@auth._auth_return_future # pylint: disable=protected-access | |
def get_authenticated_user(self, redirect_uri, client_id, client_secret, code, callback, extra_fields=None): | |
http = httpclient.AsyncHTTPClient() | |
args = { | |
"redirect_uri": redirect_uri, | |
"code": code, | |
"client_id": client_id, | |
"client_secret": client_secret, | |
"extra_params": { | |
"grant_type": "authorization_code" | |
} | |
} | |
fields = set(['id']) | |
if extra_fields: | |
fields.update(extra_fields) | |
http.fetch( | |
self._oauth_request_token_url(**args), | |
functools.partial(self._on_access_token, redirect_uri, client_id, client_secret, callback, fields), | |
method="POST", body="", request_timeout=REQUEST_TIMEOUT | |
) | |
def _on_access_token(self, redirect_uri, client_id, client_secret, future, fields, response): # pylint: disable=unused-argument | |
if response.error: | |
self._set_error(future, 'LinkedIn auth error (%s): %s' % (response.code, response.body), response) | |
return | |
args = escape.json_decode(response.body) | |
expires_in = args["expires_in"] | |
access_token = args["access_token"] | |
self.linkedin_request( | |
path="/v1/people/~:(%s)" % ",".join(fields), | |
callback=functools.partial(self._on_get_user_info, future, expires_in, access_token), | |
access_token=access_token, | |
) | |
def _on_get_user_info(self, future, expires_in, access_token, user): | |
if user is None: | |
future.set_result(None) | |
return | |
user["access_token"] = access_token | |
user["expires_in"] = expires_in | |
future.set_result(user) | |
def _set_error(self, future, message, response): | |
e = auth.AuthError(message) | |
e.code = response.code | |
e.url = response.request.url | |
e.body = response.body | |
future.set_exception(e) | |
@auth._auth_return_future # pylint: disable=protected-access | |
def linkedin_request(self, path, callback, method="GET", access_token=None, post_args=None, query_args=None): | |
url = "https://api.linkedin.com" + path | |
# Build the query parameters | |
all_query_args = dict(query_args or {}) | |
if access_token: | |
all_query_args["oauth2_access_token"] = access_token | |
if all_query_args: | |
url += "?" + urllib.parse.urlencode(all_query_args) | |
# Build the request body. Empty bodies must be either set to an empty | |
# string or None based on the request method. This is because the | |
# Tornado HTTP client aggressively throws errors based on the request | |
# method / body content combination. | |
if self.request.method in ("POST", "PATCH", "PUT"): | |
if post_args: | |
body = urllib.parse.urlencode(post_args) | |
else: | |
body = "" | |
else: | |
body = None | |
http = httpclient.AsyncHTTPClient() | |
# Ask linkedin to send us JSON on all API calls (not xml) | |
headers = httputil.HTTPHeaders({"x-li-format": "json"}) | |
http.fetch( | |
url, | |
callback=functools.partial(self._on_linkedin_request, callback), | |
method=method, headers=headers, body=body, request_timeout=REQUEST_TIMEOUT | |
) | |
def _on_linkedin_request(self, future, response): | |
if response.error: | |
self._set_error(future, "LinkedIn error (%s) when requesting %s: %s" % (response.code, response.request.url, response.body), response) | |
else: | |
future.set_result(escape.json_decode(response.body)) |
Didn't see this comment earlier. It was tested/used in production until recently. With tornado 4.x this will no longer work, as async_callback
is removed.
Updated the snippet to work on tornado 4.x. I'll continue to update it if other issues arise.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
looks like decent code... Is it tested?