Last active
December 12, 2015 06:29
-
-
Save KazuyaHayashi/4729929 to your computer and use it in GitHub Desktop.
get login user's information by OAuth2 Authentication.
check user is admin by access to provisioning API with JWT.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import flask | |
import logging | |
import json | |
from flask import request, redirect | |
import gdata.gauth | |
import gdata.apps.service | |
import gdata.apps.groups.service | |
import gdata.auth | |
from gdata.apps.service import AppsForYourDomainException | |
from jwtutils import getServiceAccountsAccessToken | |
application = flask.Flask(__name__) | |
application.debug=True | |
@application.route('/') | |
def hello_world(): | |
return 'hello world' | |
import os | |
@application.route('/jwt/groups') | |
def jwt_get_groups(): | |
access_token = getServiceAccountsAccessToken('[email protected]') | |
auth_token = "OAuth %s" % access_token | |
service = gdata.apps.groups.service.GroupsService( | |
source='APP_NAME', domain='apps00free.prd.demodesu.com', | |
additional_headers={"Authorization":auth_token}) | |
groups = service.RetrieveAllGroups() | |
group_names = "<br/>" | |
for group in groups: | |
group_names += "%s<br/>" % group['groupId'] | |
return group_names | |
def getClientsSecrets(): | |
f = open("client_secrets.json") | |
client_json = f.readlines() | |
return json.loads(client_json[0]) | |
@application.route('/oauth2/groups') | |
def oauth2_groups(): | |
f = open('access_token', 'r') | |
access_token = f.readline() | |
f.close() | |
oauth2_auth_header = "OAuth %s" % access_token | |
service = gdata.apps.groups.service.GroupsService( | |
domain='apps00free.prd.demodesu.com', | |
additional_headers={"Authorization":oauth2_auth_header}) | |
groups = service.RetrieveAllGroups() | |
group_names = "<br/>" | |
for group in groups: | |
group_names += "%s<br/>" % group['groupId'] | |
env_keys = os.environ.keys() | |
env_str = "" | |
for env_key in env_keys: | |
env_str += "%s = %s<br/>" % (env_key, os.environ[env_key]) | |
return """ | |
<html> | |
<head></head> | |
<body>""" + group_names + """ | |
<br/>env<br/>""" + env_str + """ | |
</body> | |
</html> | |
""" | |
@application.route('/oauth2/user') | |
def oauth2_user(): | |
f = open('access_token', 'r') | |
access_token = f.readline() | |
f.close() | |
oauth2_auth_header = "OAuth %s" % access_token | |
service = gdata.apps.service.AppsService( | |
domain='apps00free.prd.demodesu.com', | |
additional_headers={"Authorization":oauth2_auth_header}) | |
user = service.RetrieveUser('user00') | |
return """ | |
<html> | |
<head></head> | |
<body>""" + user.login.user_name + """ | |
</body> | |
</html> | |
""" | |
@application.route('/oauth2/me') | |
def oauth2_me(): | |
f = open('access_token', 'r') | |
access_token = f.readline() | |
f.close() | |
import httplib2 | |
http = httplib2.Http() | |
resp, content = http.request( | |
'https://www.googleapis.com/oauth2/v1/userinfo', | |
'GET', | |
headers={'Authorization':'Bearer %s' % access_token}) | |
data = json.loads(content) | |
jwt = getServiceAccountsAccessToken(data['email']) | |
jwt_header = "OAuth %s" % jwt | |
service = gdata.apps.service.AppsService( | |
source='APP_NAME', domain=data['hd'], | |
additional_headers={"Authorization":jwt_header}) | |
try: | |
user_name = data['email'].split('@')[0] | |
user = service.RetrieveUser(user_name) | |
except AppsForYourDomainException, e: | |
logging.error(e) | |
return "%s is *not* admin user" % user_name | |
if user.login.admin: | |
return "%s is admin user" % user.login.user_name | |
else: | |
return "%s is *not* admin user" % user.login.user_name | |
@application.route('/oauth2/login') | |
def oauth2_login(): | |
client_secrets = getClientsSecrets() | |
token = gdata.gauth.OAuth2Token( | |
client_id=client_secrets['web']['client_id'], | |
client_secret=client_secrets['web']['client_secret'], | |
scope=( | |
'https://www.googleapis.com/auth/userinfo.email ' | |
'https://www.googleapis.com/auth/userinfo.profile'), | |
user_agent='oauth2-provisioningv2') | |
redirect_url = token.generate_authorize_url( | |
redirect_uri=client_secrets['web']['redirect_uris'][0]) | |
return redirect(redirect_url) | |
@application.route('/oauth2/callback') | |
def oauth2_callback(): | |
client_secrets = getClientsSecrets() | |
token = gdata.gauth.OAuth2Token( | |
client_id=client_secrets['web']['client_id'], | |
client_secret=client_secrets['web']['client_secret'], | |
scope=( | |
'https://www.googleapis.com/auth/userinfo.email ' | |
'https://www.googleapis.com/auth/userinfo.profile'), | |
user_agent='oauth2-provisioningv2') | |
token.generate_authorize_url(redirect_uri=client_secrets['web']['redirect_uris'][0]) | |
code = request.args.get("code", None) | |
token.get_access_token(code) | |
f = open("access_token", 'w') | |
f.write(token.access_token) | |
f.close() | |
return redirect('/oauth2/me') | |
if __name__ == '__main__': | |
application.run(host='0.0.0.0', debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment