|
#!/usr/bin/env python |
|
|
|
""" |
|
https://gist.github.com/rpuntaie/7148c2295e0852a98e219a211b812e79 |
|
|
|
Social-core |
|
https://github.com/python-social-auth/social-core |
|
has a framework design. |
|
|
|
This example stubs the wanted callback classes to just use the authentication. |
|
|
|
CLI1:: |
|
/mnt/src/example-oauth2-server |
|
rm website/sqlite.db |
|
flask run |
|
#in browser: localhost:5000 |
|
#enter username |
|
#create client: |
|
# local, http://localhost:8080, profile email, http://localhost:8080/auth/local/callback |
|
# authorization_code, code, client_secret_post |
|
CLI2:: |
|
export OAUTHLIB_INSECURE_TRANSPORT=1 |
|
export SOCIAL_AUTH_LOCAL_KEY="92pSnHmecuU9F66SXutV5A1h" |
|
export SOCIAL_AUTH_LOCAL_SECRET="1FuSRKIM6VKzilZcQm0mfRss0pMtX9f93FrTsD2A17aJlcgN" |
|
cd ~/tmp |
|
./test_social_core.py |
|
""" |
|
|
|
import sys |
|
if sys.path[0]!='.': |
|
sys.path.insert(0,'.') |
|
|
|
import os |
|
import bottle |
|
import datetime |
|
|
|
|
|
from functools import lru_cache |
|
@lru_cache() |
|
def test_social_core_secret(): |
|
try: |
|
secret = os.environ['test_social_core_secret'] |
|
except: |
|
secret = 'test_social_core_secret' |
|
return secret |
|
|
|
######################################################################################BEGIN |
|
# copied from backends/google.py and modified to work with local example-oauth2-server |
|
# not needed when importing backends from social_core |
|
from social_core.backends.oauth import BaseOAuth2 |
|
class BaseLocalAuth(object): |
|
def get_user_id(self, details, response): |
|
"""Use google email as unique id""" |
|
if self.setting('USE_UNIQUE_USER_ID', False): |
|
if 'sub' in response: |
|
return response['sub'] |
|
else: |
|
return response['id'] |
|
else: |
|
return details['email'] |
|
def get_user_details(self, response): |
|
"""Return user details from Local API account""" |
|
if 'email' in response: |
|
email = response['email'] |
|
else: |
|
email = '' |
|
name, given_name, family_name = ( |
|
response.get('name', ''), |
|
response.get('given_name', ''), |
|
response.get('family_name', ''), |
|
) |
|
fullname, first_name, last_name = self.get_user_names( |
|
name, given_name, family_name |
|
) |
|
return {'username': email.split('@', 1)[0], |
|
'email': email, |
|
'fullname': fullname, |
|
'first_name': first_name, |
|
'last_name': last_name} |
|
class BaseLocalOAuth2API(BaseLocalAuth): |
|
def user_data(self, access_token, *args, **kwargs): |
|
"""Return user data from Local API""" |
|
return self.get_json( |
|
'http://localhost:5000/api/me', |
|
headers={ |
|
'Authorization': 'Bearer %s' % access_token, |
|
}, |
|
) |
|
def revoke_token_params(self, token, uid): |
|
return {'token': token} |
|
def revoke_token_headers(self, token, uid): |
|
return {'Content-type': 'application/json'} |
|
class local(BaseLocalOAuth2API, BaseOAuth2): |
|
name = 'local' |
|
REDIRECT_STATE = False |
|
AUTHORIZATION_URL = 'http://localhost:5000/oauth/authorize' |
|
ACCESS_TOKEN_URL = 'http://localhost:5000/oauth/token' |
|
ACCESS_TOKEN_METHOD = 'POST' |
|
REVOKE_TOKEN_URL = 'http://localhost:5000/oauth/revoke' |
|
REVOKE_TOKEN_METHOD = 'GET' |
|
DEFAULT_SCOPE = ['email', 'profile'] |
|
EXTRA_DATA = [ |
|
('refresh_token', 'refresh_token', True), |
|
('expires_in', 'expires'), |
|
('token_type', 'token_type', True) |
|
] |
|
# copied from backends/google.py and modified to work with local example-oauth2-server |
|
######################################################################################END |
|
|
|
|
|
######################################################################################BEGIN |
|
# adapters to minimally satisfy social-core's framework like design |
|
from social_core.strategy import BaseStrategy |
|
from social_core.storage import UserMixin, BaseStorage |
|
from urllib.parse import urljoin |
|
from functools import wraps |
|
# secrets are included via os.environ |
|
social_core_setting = { |
|
'SOCIAL_AUTH_SANITIZE_REDIRECTS': False |
|
# sign in with linkedin is not automatic but needs reviewing! |
|
# ,'SOCIAL_AUTH_LINKEDIN_OAUTH2_FIELD_SELECTORS': ['emailAddress'] |
|
# ,'SOCIAL_AUTH_LINKEDIN_OAUTH2_EXTRA_DATA':[('id', 'id'), |
|
# ('firstName', 'first_name'), |
|
# ('lastName', 'last_name'), |
|
# ('emailAddress', 'email_address')] |
|
# including email produces a popup, without not |
|
# ,'SOCIAL_AUTH_FACEBOOK_SCOPE': ['email'] |
|
# ,'SOCIAL_AUTH_FACEBOOK_PROFILE_EXTRA_PARAMS': { |
|
# 'fields': 'id, name, email' |
|
# } |
|
,'SOCIAL_AUTH_FIELDS_STORED_IN_SESSION': [] |
|
,'SOCIAL_AUTH_PIPELINE': ('social_core.pipeline.social_auth.social_details' |
|
,'social_core.pipeline.social_auth.social_uid' |
|
,'social_core.pipeline.social_auth.auth_allowed' |
|
,'test_social_core.social_user' |
|
) |
|
} |
|
from social_core.utils import setting_name |
|
from social_core.backends.google import GoogleOAuth2 as google |
|
from social_core.backends.facebook import FacebookOAuth2 as facebook |
|
from social_core.backends.linkedin import LinkedinOAuth2 as linkedin |
|
from social_core.backends.instagram import InstagramOAuth2 as instagram |
|
from social_core.backends.twitter import TwitterOAuth as twitter |
|
from social_core.backends.pinterest import PinterestOAuth2 as pinterest |
|
social_logins = {} |
|
#of these only google and facebook tested, and here in this file only local |
|
for social in 'local google facebook linkedin instagram twitter pinterest'.split(): |
|
try: |
|
sli=globals()[social] |
|
for suffix in ['KEY','SECRET']: |
|
#'SOCIAL_AUTH_'+sli.name.upper().replace('-','_')+'_'+suffix |
|
envkey = setting_name(sli.name,suffix) |
|
social_core_setting[envkey] = os.environ[envkey] |
|
social_logins[social] = sli |
|
except: |
|
pass |
|
print(social_logins) |
|
def social_login_name(cls): |
|
for k,v in social_logins.items(): |
|
if v.__name__ == cls.__name__: |
|
return k |
|
class UserModel(UserMixin): |
|
@classmethod |
|
def user_model(cls): |
|
return tuple # type =! dict |
|
class storage_for_social_core(BaseStorage): |
|
user = UserModel |
|
class strategy_for_social_core(BaseStrategy): |
|
def __init__(self, storage=None, tpl=None): |
|
super().__init__(storage,tpl) |
|
self.save = {} |
|
def get_setting(self, name): |
|
return social_core_setting[name] |
|
def request_data(self, merge=True): |
|
request = bottle.request |
|
if merge: |
|
data = request.params |
|
elif request.method == 'POST': |
|
data = request.forms |
|
else: |
|
data = request.query |
|
return data |
|
def redirect(self, url): |
|
return bottle.redirect(url) |
|
def session_get(self, name, default=None): |
|
nn = 'test_social_core_'+name |
|
# don't return deleted cookies |
|
if nn in self.save and self.save[nn]=='': |
|
sessval = None |
|
else: |
|
sessval = bottle.request.get_cookie(nn,secret=test_social_core_secret()) |
|
# save needed due to session_setdefault() in social_core/strategy.py |
|
if sessval is None and nn in self.save: |
|
sessval = self.save[nn] |
|
return sessval |
|
def session_set(self, name, value): |
|
nn = 'test_social_core_'+name |
|
self.save[nn] = value |
|
bottle.response.set_cookie(nn,value,secret=test_social_core_secret(),max_age=datetime.timedelta(days=30)) |
|
def session_pop(self, name): |
|
nn = 'test_social_core_'+name |
|
try: |
|
self.save[nn]='' |
|
except KeyError: |
|
pass |
|
bottle.response.delete_cookie(nn) |
|
def build_absolute_uri(self, path=None): |
|
return urljoin(bottle.request.url,path or '') |
|
def make_backend_obj(): |
|
def decorator(func): |
|
@wraps(func) |
|
def wrapper(provider, *args, **kwargs): |
|
try: |
|
Backend = social_logins[provider] |
|
strategy = strategy_for_social_core(storage_for_social_core) |
|
uri = urljoin(bottle.request.url, f'/auth/{provider}/callback') |
|
backend = Backend(strategy, redirect_uri=uri) |
|
return func(backend, *args, **kwargs) |
|
except KeyError: |
|
bottle.redirect('/') |
|
return wrapper |
|
return decorator |
|
#this is called via social_core (see PIPELINE) |
|
def social_user(backend, uid, user=None, *args, **kwargs): |
|
info = kwargs['details'] |
|
print(info)# |
|
info['social'] = social_login_name(backend.__class__) |
|
print(info)# |
|
bottle.response.set_cookie('test_social_core_user',info,secret=test_social_core_secret(),max_age=datetime.timedelta(days=30),path='/') |
|
#statisfy social_core: invert framework to library |
|
class AttributeDict(dict): |
|
__getattr__ = dict.__getitem__ |
|
__setattr__ = dict.__setitem__ |
|
kwargs['user'] = AttributeDict() |
|
kwargs['social'] = user |
|
kwargs['is_new'] = None |
|
kwargs['user'].social = user |
|
return kwargs |
|
# adapters to minimally satisfy social-core's framework like design |
|
######################################################################################END |
|
|
|
|
|
######################################################################################BEGIN |
|
# app with bottle |
|
from social_core.exceptions import SocialAuthBaseException |
|
from social_core.actions import do_auth, do_complete |
|
@bottle.route('/auth/<provider>', method=('GET', 'POST')) |
|
@make_backend_obj() |
|
def auth_login(backend): |
|
try: |
|
do_auth(backend) |
|
except SocialAuthBaseException: |
|
bottle.redirect('/') |
|
@bottle.route('/auth/<provider>/callback', method=('GET', 'POST')) |
|
@make_backend_obj() |
|
def auth_callback(backend): |
|
try: |
|
user = do_complete(backend, login=None) |
|
except SocialAuthBaseException: |
|
pass |
|
bottle.redirect('/') |
|
@bottle.route('/logout') |
|
def logout(): |
|
bottle.response.delete_cookie('test_social_core_user') |
|
bottle.redirect('/') |
|
@bottle.route('/') |
|
def start(): |
|
try: |
|
info = bottle.request.get_cookie('test_social_core_user',secret=test_social_core_secret()) |
|
except: |
|
info = {} |
|
if info: |
|
return f"""<html> |
|
<body> |
|
<a href="/logout">logout {info['social']}</a> |
|
</body> |
|
</html> |
|
""" |
|
else: |
|
return """<html> |
|
<body> |
|
<a href="/auth/local">login</a> |
|
</body> |
|
</html> |
|
""" |
|
|
|
|
|
if __name__=='__main__': |
|
bottle.run() |