Skip to content

Instantly share code, notes, and snippets.

@rpuntaie
Last active February 23, 2021 01:24
Show Gist options
  • Save rpuntaie/7148c2295e0852a98e219a211b812e79 to your computer and use it in GitHub Desktop.
Save rpuntaie/7148c2295e0852a98e219a211b812e79 to your computer and use it in GitHub Desktop.
OAuth2 social login. First file: attempt to use just the data from django-allauth. Second file: social-core based solution actually adopted.

Python OAuth Libraries

Googling for OAuth for python returns many results, but not immediately the relevant ones, i.e. up-to-date, independent, authoritative implementations. So it took me more than necessary to adopt the right library. Here is my conclusion about the current state, to possibly abbreviate the search time for OAuth new-comers.

OAuth Libraries

There are two major unrelated libraries implementing the protocol

Social Login Libraries

My need was just for login with, i.e. social login, as an alternative to the username/password auth flow.

I found older libraries, but since OAuth2 is form 2012, and new OAuth providers were gradually emerging, only new and active libraries matter.

The first such library for social login that I found was https://github.com/authlib/loginpass, based on authlib. Only later I realized, that I should better base on oauthlib.

Others were

At that point I thought there is no library just doing social login, without further dependence. I actually wanted to extract just the data from django-allauth, and base the flow on https://github.com/requests/requests-oauthlib (this gist).

But, browsing through libraries using requests-oauthlib on github, I came across https://github.com/python-social-auth/social-core. social-core is not django specific. It had a django history, though, which is possibly the reason of its framework design. If you just want the raw flow, you need to stub the expected classes. I did so and included it in the gist (second file).

Appendix: OAuth

A OAuth provider

  • authenticates the user and
  • asks the user to authorize a client app
  • to access the user's resources according to scopes (e.g. profile scope, email scope)

One needs to register an app at the OAuth provider to get a

  • client_id and
  • client_secret

Some client apps only need a user ID, to associate information to that user, without ever the need to email that user. In this case the openid scope is enough. It just returns an ID for the user in that social network. This gives the same person as many identities in the app, as many social accounts he uses to log in.

A client app might further want to identify a user beyond their social account, e.g. by their email, in which case more social accounts can result in the same email. The email uniquely identifies a person (n-1), just like the social account does, but allows to inform the user about changes, if necessary.

openid comes from the OpenID Connect (OIDC) spec, a technical extension to OAuth2, which practically is subsumed by OAuth2. OAuth1a is older and a bit more complicated, because not relying on HTTPS. Both use tokens representing a temporary grant. In the following steps the last three occur for OAuth1 and OAuth2, but the HTTP request parameters differ. Also, OAuth1 needs signing, while OAuth2 relies on HTTPS.

  1. OAuth1 only: get unauthorized request token from request URL to authorize it in the next step
  2. Visit authorization URL The authorization token for a scope is returned via a 302 redirect to the provided callback_uri.
  3. Retrieve an access token from the access token URL. The access token is normally a jwt (<header>.<payload>.<signature>).
  4. Retrieve information according scope from the API URL providing Authorization: Bearer <access token> in the http authorization header. See Bearer Token.

The linked specs are the best source for the details.

Some OAuth providers also accompany the access token with a refresh token, that can be used to retrieve a new access token, when the old one has expired.

Some OAuth providers, provide the email in a basic scope, others provide it in a more sensitive scope and might even do a human review of the app, just for a login with, like Microsoft's LinkedIn.

#!/usr/bin/env python
"""
My minimal OAuth2 social login library attempt before I found
https://github.com/python-social-auth/social-core.
The latter has a framework design (inversion of control).
This approach here has a library design.
Tested with https://github.com/authlib/example-oauth2-server.
There are social login packages out there:
https://github.com/python-social-auth/social-core (authoritative oauthlib-based implementation)
https://github.com/pennersr/django-allauth (django-specific)
https://github.com/rpuntaie/loginpass (authlib-based instead of oauthlib)
tornado/tornado/auth.py (tornado-specific)
(others are either deprecated or unmaintained as of 2020)
But one might want to limit dependency, and just take the provider data.
To test:
CLI1::
cd example-oauth2-server
rm website/sqlite.db
flask run
#in browser:
#enter username
#create client (https://user-images.githubusercontent.com/290496/38811988-081814d4-41c6-11e8-88e1-cb6c25a6f82e.png)
# Local,http://localhost:8080,profile email,http://localhost:8080/auth/local/callback
# authorization_code, code, client_secret_post
#copy to export client_id and client_secret in CLI2 (see below)
CLI2::
export OAUTHLIB_INSECURE_TRANSPORT=1
#as given on the example-oauth2-server page
export LOCAL_CLIENT_ID="92pSnHmecuU9F66SXutV5A1h"
export LOCAL_CLIENT_SECRET="1FuSRKIM6VKzilZcQm0mfRss0pMtX9f93FrTsD2A17aJlcgN"
cd ~/tmp
./test_oauth2.py
"""
# OAuth2
import os
from requests_oauthlib import OAuth2Session
class ProviderBase:
Flow = OAuth2Session
params = {}
scope = None
def __init__(self
,client_id = None
,client_secret = None
):
PROVIDER = self.id.upper()
try:
self.client_id = client_id or os.environ[f'{PROVIDER}_CLIENT_ID']
except KeyError:
self.client_id = client_id
try:
self.client_secret = client_secret or os.environ[f'{PROVIDER}_CLIENT_SECRET']
except KeyError:
self.client_secret = client_secret
assert self.client_id
assert self.client_secret
def authorize(self
,callback_uri
):
"""Call this in your login handler
:param callback_uri: of your app to handle provider callback
:return url,state: Save the state as e.g. a coookie for ``authorized()`` in the callback_uri handler.
Redirect the browser to the returned provider URL.
The provider will redirect to callback_uri.
The provider adds parameters to callback_uri before redirecting.
"""
self.flow = self.Flow(self.client_id
,redirect_uri=callback_uri
,scope=self.scope
)
url, state = self.flow.authorization_url(self.authorize_url
, **self.params
)
return url, state
def authorized(self
,callback_uri
,response_uri
,state
,token_updater
):
"""Call this in the redirect_uri handler
:param token_updater: function that stores a refreshed access token in e.g. a cookie
"""
self.flow = self.Flow(self.client_id
,state=state
,redirect_uri=callback_uri
)
try:
access_jwt = self.flow.fetch_token(self.access_token_url
,authorization_response=response_uri
,client_secret=self.client_secret
)
#access_jwt = {'access_token': 'D447qOKuh0v48OVTNQ7sj2Bj1uux0Kk5AA01kOFbM5',
# 'expires_in': 864000, 'scope': ['profile', 'email'], 'token_type': 'Bearer', 'expires_at': 1582360976.518611}
token_updater(access_jwt)
info = self.info()
return info
except:
pass
def info(self
,access_jwt = None
,token_updater = None
):
"""Call this to get info.
Provide parameters only if at a later request, i.e. not immediately after ``authorized()``.
:param access_jwt: as returned by ``authorized``, restored e.g. form a cookie
:param token_updater: function that stores a refreshed access token in e.g. a cookie
"""
if access_jwt is not None:
self.flow = self.Flow(self.client_id
,token=access_jwt
,auto_refresh_url=self.refresh_url
,auto_refresh_kwargs=self.params
,token_updater=token_updater
)
try:
profile_urls = self.profile_url if isinstance(self.profile_url,list) else [self.profile_url]
info = {}
for profile_url in profile_urls:
if callable(profile_url):
profile_url = profile_url(**self.flow.token)
profile = self.flow.get(profile_url)
info.update(self.normalize(profile.json()))
return info
except:
pass
def normalize(self, data):
return dict(data)
@property
def refresh_url(self):
return self.access_token_url
class Local(ProviderBase):
id = 'local'
name = 'Local'
scope = 'profile email'
def normalize(self, data):
res = dict(data)
res['unique_id'] = str(data['id'])
res['name'] = data['username']
res['email'] = data['username']+'@local.org'
return res
access_token_url = 'http://localhost:5000/oauth/token'
authorize_url = 'http://localhost:5000/oauth/authorize'
profile_url = 'http://localhost:5000/api/me'
#only those where os.environ['XXX_CLIENT_ID/SECRET'] is set will get in
social_logins = {}
for social in ['local']: #'google facebook linkedin instagram twitter pinterest'.split():
try:
social_logins[social]=globals()[social[0].upper()+social[1:]]()
except:
pass
# example using bottle
import bottle
import datetime
from urllib.parse import urljoin
@bottle.route('/auth/<provider>')
def auth_login(provider):
try:
callback_uri = urljoin(bottle.request.url, f'/auth/{provider}/callback')
provider_url, state = social_logins[provider].authorize(
callback_uri = callback_uri
)
except Exception as e:
raise bottle.HTTPError(404, f"Login via {provider} not supported: "+str(e))
bottle.response.set_cookie('test_oauth2_login_state',state,
httponly=True,path='/',samesite='strict',max_age=datetime.timedelta(days=30))
bottle.redirect(provider_url)
import jwt as pyjwt
test_oauth2_secret='test_oauth2_secret'
def jwtcode(jwt):
#jwt = {'access_token': 'D447qOKuh0v48OVTNQ7sj2Bj1uux0Kk5AA01kOFbM5',
# 'expires_in': 864000, 'scope': ['profile', 'email'], 'token_type': 'Bearer', 'expires_at': 1582360976.518611}
try:
jwt = pyjwt.encode(jwt,test_oauth2_secret).decode()
except:
jwt = pyjwt.decode(jwt.encode(),test_oauth2_secret)
return jwt
def todo_db_jwt_email_insert(jwt,email):
return jwtcode(jwt)
from functools import partial
@bottle.route('/auth/<provider>/callback')
def auth_callback(provider):
token_inserter = todo_db_jwt_email_insert
def token_updater(jwt):
nonlocal token_inserter
token_inserter = partial(token_inserter,jwt)
callback_uri = urljoin(bottle.request.url, f'/auth/{provider}/callback')
info = social_logins[provider].authorized(
callback_uri = callback_uri
,response_uri=bottle.request.url
,state=bottle.request.get_cookie('test_oauth2_login_state')
,token_updater=token_updater
)
if info is None: #assuming denied: just continue without login
bottle.redirect('/')
fullname = f'{info["name"]}({social_logins[provider].name})'
token = token_inserter(info['email'])
bottle.response.set_cookie('test_oauth2_user',token,
httponly=True,path='/',samesite='strict',max_age=datetime.timedelta(days=30))
bottle.redirect('/')
@bottle.route('/logout')
def logout():
bottle.response.delete_cookie('test_oauth2_user')
bottle.redirect('/')
@bottle.route('/')
def start():
try:
jwt = jwtcode(bottle.request.get_cookie('test_oauth2_user'))
except:
jwt = ""
if jwt:
return f"""<html>
<body>
<a href="/logout">logout {jwt['access_token']}</a>
</body>
</html>
"""
else:
return """<html>
<body>
<a href="/auth/local">login</a>
</body>
</html>
"""
if __name__=='__main__':
bottle.run()
#!/usr/bin/env python
"""
https://gist.github.com/rpuntaie/7148c2295e0852a98e219a211b812e79
Social-core
https://github.com/python-social-auth/social-core
has a framework design.
This example stubs the wanted callback classes to just use the authentication.
CLI1::
/mnt/src/example-oauth2-server
rm website/sqlite.db
flask run
#in browser: localhost:5000
#enter username
#create client:
# local, http://localhost:8080, profile email, http://localhost:8080/auth/local/callback
# authorization_code, code, client_secret_post
CLI2::
export OAUTHLIB_INSECURE_TRANSPORT=1
export SOCIAL_AUTH_LOCAL_KEY="92pSnHmecuU9F66SXutV5A1h"
export SOCIAL_AUTH_LOCAL_SECRET="1FuSRKIM6VKzilZcQm0mfRss0pMtX9f93FrTsD2A17aJlcgN"
cd ~/tmp
./test_social_core.py
"""
import sys
if sys.path[0]!='.':
sys.path.insert(0,'.')
import os
import bottle
import datetime
from functools import lru_cache
@lru_cache()
def test_social_core_secret():
try:
secret = os.environ['test_social_core_secret']
except:
secret = 'test_social_core_secret'
return secret
######################################################################################BEGIN
# copied from backends/google.py and modified to work with local example-oauth2-server
# not needed when importing backends from social_core
from social_core.backends.oauth import BaseOAuth2
class BaseLocalAuth(object):
def get_user_id(self, details, response):
"""Use google email as unique id"""
if self.setting('USE_UNIQUE_USER_ID', False):
if 'sub' in response:
return response['sub']
else:
return response['id']
else:
return details['email']
def get_user_details(self, response):
"""Return user details from Local API account"""
if 'email' in response:
email = response['email']
else:
email = ''
name, given_name, family_name = (
response.get('name', ''),
response.get('given_name', ''),
response.get('family_name', ''),
)
fullname, first_name, last_name = self.get_user_names(
name, given_name, family_name
)
return {'username': email.split('@', 1)[0],
'email': email,
'fullname': fullname,
'first_name': first_name,
'last_name': last_name}
class BaseLocalOAuth2API(BaseLocalAuth):
def user_data(self, access_token, *args, **kwargs):
"""Return user data from Local API"""
return self.get_json(
'http://localhost:5000/api/me',
headers={
'Authorization': 'Bearer %s' % access_token,
},
)
def revoke_token_params(self, token, uid):
return {'token': token}
def revoke_token_headers(self, token, uid):
return {'Content-type': 'application/json'}
class local(BaseLocalOAuth2API, BaseOAuth2):
name = 'local'
REDIRECT_STATE = False
AUTHORIZATION_URL = 'http://localhost:5000/oauth/authorize'
ACCESS_TOKEN_URL = 'http://localhost:5000/oauth/token'
ACCESS_TOKEN_METHOD = 'POST'
REVOKE_TOKEN_URL = 'http://localhost:5000/oauth/revoke'
REVOKE_TOKEN_METHOD = 'GET'
DEFAULT_SCOPE = ['email', 'profile']
EXTRA_DATA = [
('refresh_token', 'refresh_token', True),
('expires_in', 'expires'),
('token_type', 'token_type', True)
]
# copied from backends/google.py and modified to work with local example-oauth2-server
######################################################################################END
######################################################################################BEGIN
# adapters to minimally satisfy social-core's framework like design
from social_core.strategy import BaseStrategy
from social_core.storage import UserMixin, BaseStorage
from urllib.parse import urljoin
from functools import wraps
# secrets are included via os.environ
social_core_setting = {
'SOCIAL_AUTH_SANITIZE_REDIRECTS': False
# sign in with linkedin is not automatic but needs reviewing!
# ,'SOCIAL_AUTH_LINKEDIN_OAUTH2_FIELD_SELECTORS': ['emailAddress']
# ,'SOCIAL_AUTH_LINKEDIN_OAUTH2_EXTRA_DATA':[('id', 'id'),
# ('firstName', 'first_name'),
# ('lastName', 'last_name'),
# ('emailAddress', 'email_address')]
# including email produces a popup, without not
# ,'SOCIAL_AUTH_FACEBOOK_SCOPE': ['email']
# ,'SOCIAL_AUTH_FACEBOOK_PROFILE_EXTRA_PARAMS': {
# 'fields': 'id, name, email'
# }
,'SOCIAL_AUTH_FIELDS_STORED_IN_SESSION': []
,'SOCIAL_AUTH_PIPELINE': ('social_core.pipeline.social_auth.social_details'
,'social_core.pipeline.social_auth.social_uid'
,'social_core.pipeline.social_auth.auth_allowed'
,'test_social_core.social_user'
)
}
from social_core.utils import setting_name
from social_core.backends.google import GoogleOAuth2 as google
from social_core.backends.facebook import FacebookOAuth2 as facebook
from social_core.backends.linkedin import LinkedinOAuth2 as linkedin
from social_core.backends.instagram import InstagramOAuth2 as instagram
from social_core.backends.twitter import TwitterOAuth as twitter
from social_core.backends.pinterest import PinterestOAuth2 as pinterest
social_logins = {}
#of these only google and facebook tested, and here in this file only local
for social in 'local google facebook linkedin instagram twitter pinterest'.split():
try:
sli=globals()[social]
for suffix in ['KEY','SECRET']:
#'SOCIAL_AUTH_'+sli.name.upper().replace('-','_')+'_'+suffix
envkey = setting_name(sli.name,suffix)
social_core_setting[envkey] = os.environ[envkey]
social_logins[social] = sli
except:
pass
print(social_logins)
def social_login_name(cls):
for k,v in social_logins.items():
if v.__name__ == cls.__name__:
return k
class UserModel(UserMixin):
@classmethod
def user_model(cls):
return tuple # type =! dict
class storage_for_social_core(BaseStorage):
user = UserModel
class strategy_for_social_core(BaseStrategy):
def __init__(self, storage=None, tpl=None):
super().__init__(storage,tpl)
self.save = {}
def get_setting(self, name):
return social_core_setting[name]
def request_data(self, merge=True):
request = bottle.request
if merge:
data = request.params
elif request.method == 'POST':
data = request.forms
else:
data = request.query
return data
def redirect(self, url):
return bottle.redirect(url)
def session_get(self, name, default=None):
nn = 'test_social_core_'+name
# don't return deleted cookies
if nn in self.save and self.save[nn]=='':
sessval = None
else:
sessval = bottle.request.get_cookie(nn,secret=test_social_core_secret())
# save needed due to session_setdefault() in social_core/strategy.py
if sessval is None and nn in self.save:
sessval = self.save[nn]
return sessval
def session_set(self, name, value):
nn = 'test_social_core_'+name
self.save[nn] = value
bottle.response.set_cookie(nn,value,secret=test_social_core_secret(),max_age=datetime.timedelta(days=30))
def session_pop(self, name):
nn = 'test_social_core_'+name
try:
self.save[nn]=''
except KeyError:
pass
bottle.response.delete_cookie(nn)
def build_absolute_uri(self, path=None):
return urljoin(bottle.request.url,path or '')
def make_backend_obj():
def decorator(func):
@wraps(func)
def wrapper(provider, *args, **kwargs):
try:
Backend = social_logins[provider]
strategy = strategy_for_social_core(storage_for_social_core)
uri = urljoin(bottle.request.url, f'/auth/{provider}/callback')
backend = Backend(strategy, redirect_uri=uri)
return func(backend, *args, **kwargs)
except KeyError:
bottle.redirect('/')
return wrapper
return decorator
#this is called via social_core (see PIPELINE)
def social_user(backend, uid, user=None, *args, **kwargs):
info = kwargs['details']
print(info)#
info['social'] = social_login_name(backend.__class__)
print(info)#
bottle.response.set_cookie('test_social_core_user',info,secret=test_social_core_secret(),max_age=datetime.timedelta(days=30),path='/')
#statisfy social_core: invert framework to library
class AttributeDict(dict):
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
kwargs['user'] = AttributeDict()
kwargs['social'] = user
kwargs['is_new'] = None
kwargs['user'].social = user
return kwargs
# adapters to minimally satisfy social-core's framework like design
######################################################################################END
######################################################################################BEGIN
# app with bottle
from social_core.exceptions import SocialAuthBaseException
from social_core.actions import do_auth, do_complete
@bottle.route('/auth/<provider>', method=('GET', 'POST'))
@make_backend_obj()
def auth_login(backend):
try:
do_auth(backend)
except SocialAuthBaseException:
bottle.redirect('/')
@bottle.route('/auth/<provider>/callback', method=('GET', 'POST'))
@make_backend_obj()
def auth_callback(backend):
try:
user = do_complete(backend, login=None)
except SocialAuthBaseException:
pass
bottle.redirect('/')
@bottle.route('/logout')
def logout():
bottle.response.delete_cookie('test_social_core_user')
bottle.redirect('/')
@bottle.route('/')
def start():
try:
info = bottle.request.get_cookie('test_social_core_user',secret=test_social_core_secret())
except:
info = {}
if info:
return f"""<html>
<body>
<a href="/logout">logout {info['social']}</a>
</body>
</html>
"""
else:
return """<html>
<body>
<a href="/auth/local">login</a>
</body>
</html>
"""
if __name__=='__main__':
bottle.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment