Last active
September 26, 2024 11:59
-
-
Save jay0lee/67e2094d352e2a9f89010636d2da271d to your computer and use it in GitHub Desktop.
Google 3-legged OAuth with optional mTLS support
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Perform 3-legged OAuth with optional mTLS and Google Workspace API calls | |
''' | |
import argparse | |
import http.client | |
import json | |
import os | |
from pathlib import Path | |
import sys | |
from urllib.parse import parse_qs, urlparse | |
import google.oauth2.credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
import google_auth_httplib2 | |
import googleapiclient | |
from googleapiclient.discovery import build | |
import httplib2 | |
import oauthlib | |
TOKEN_URI = 'https://oauth2.googleapis.com/token' | |
MTLS_TOKEN_URI = 'https://oauth2.mtls.googleapis.com/token' | |
# map the short scope name to the long one. Google always returns the long | |
# scope and it confuses google-auth and prints a warning. | |
scope_mapping = { | |
'email': 'https://www.googleapis.com/auth/userinfo.email', | |
'profile': 'https://www.googleapis.com/auth/userinfo.profile', | |
} | |
# minimal scopes needed to perform our basic service call | |
service_scope_mapping = { | |
'drive': 'https://www.googleapis.com/auth/drive.appdata', | |
'gmail': 'https://www.googleapis.com/auth/gmail.metadata', | |
'calendar': ',https://www.googleapis.com/auth/calendar.readonly', | |
'cloudsearch': 'https://www.googleapis.com/auth/cloud_search.settings.query', | |
'groups': 'https://www.googleapis.com/auth/cloud-identity.groups.readonly', | |
'vault': 'https://www.googleapis.com/auth/ediscovery.readonly', | |
'directory': 'https://www.googleapis.com/auth/admin.directory.user.readonly', | |
'chat': 'https://www.googleapis.com/auth/chat.spaces.readonly', | |
'meet': 'https://www.googleapis.com/auth/meetings.space.readonly', | |
'keep': 'https://www.googleapis.com/auth/keep.readonly', | |
'tasks': 'https://www.googleapis.com/auth/tasks.readonly', | |
'resourcemanager': 'https://www.googleapis.com/auth/cloudplatformprojects.readonly', | |
} | |
def check_readable_file(filename): | |
''' | |
utility function to ensure user specified a file we can read. | |
''' | |
if not os.path.exists(filename): | |
raise argparse.ArgumentTypeError(f'{filename} does not exist') | |
return filename | |
def parse_args(): | |
''' | |
parses the script's arguments | |
''' | |
parser = argparse.ArgumentParser( | |
description='Perform 3-legged Google OAuth with optional mTLS.') | |
parser.add_argument( | |
'--scope', | |
help=('OAuth scopes to request. Separate scopes with a space. If ' | |
'not specified and --call-service is specified will use ' | |
'scope appropriate to the service.')) | |
parser.add_argument('--credentials-file', | |
help='file where credentials will be stored.', | |
required=True) | |
parser.add_argument('--cert', | |
type=check_readable_file, | |
metavar='FILE', | |
help=('optional. File with the client certificate for ' | |
'mTLS. If specified --key must also be ' | |
'specified.')) | |
parser.add_argument('--key', | |
type=check_readable_file, | |
metavar='FILE', | |
help=('optional. file with the client private key for ' | |
'mTLS. If specified --cert must also be ' | |
'specified.')) | |
parser.add_argument('--force-refresh', | |
help='optional. force access token refresh.', | |
action='store_true') | |
parser.add_argument('--call-service', | |
help=('optional. make an API call to a service. ' | |
'Default is to output access token.'), | |
# sites is not a choice because there is no New Sites API. | |
choices=['drive', | |
'gmail', | |
'calendar', | |
'cloudsearch', | |
'groups', | |
'vault', | |
'directory', | |
'chat', | |
'meet', | |
'keep', | |
'tasks', | |
'resourcemanager']) | |
parser.add_argument('--debug', | |
help='output HTTP debug logging of API calls.', | |
action='store_true') | |
args = parser.parse_args() | |
if args.cert and not args.key: | |
parser.error('When specifying --cert you must also specify --key.') | |
elif args.key and not args.cert: | |
parser.error('When specifying --key you must also specify --cert.') | |
if args.call_service == 'keep': | |
print(('\nWARNING: Keep API may not support 3-legged OAuth and you ' | |
'may see an error on the consent screen about invalid ' | |
'scopes.\n')) | |
# if --scope wasn't set but --call-service was use a sane default | |
if args.call_service and not args.scope: | |
args.scope = service_scope_mapping.get(args.call_service) | |
elif not args.scope: | |
parser.error('You must specify at least --scope or --call-service.') | |
# substitute our scope mappings as needed | |
args.scope = ' '.join([scope_mapping.get(scope, scope) for scope in args.scope.split(' ')]) | |
if args.debug: | |
# enable debugging for requests used by google-auth | |
http.client.HTTPConnection.debuglevel = 1 | |
# enable debugging for httplib2 used by googleapiclient | |
httplib2.debuglevel = 4 | |
return args | |
# pylint: disable=too-many-arguments | |
def get_valid_credentials(credentials_file, | |
debug, | |
scope, | |
force_refresh, | |
cert, | |
key): | |
''' | |
reads credentials from file or creates new ones. | |
Ensures returned credentials are valid. | |
''' | |
# Needs to be set so oauthlib doesn't puke when Google changes our scopes | |
os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = 'true' | |
new_credentials = False | |
refreshed_credentials = False | |
if not os.path.exists(credentials_file): | |
new_credentials = True | |
Path.touch(credentials_file) | |
with open(credentials_file, 'r+', encoding="utf-8") as creds_file: | |
try: | |
credentials_data = json.load(creds_file) | |
except json.decoder.JSONDecodeError as err: | |
if not new_credentials: | |
print(f'JSON parsing error reading {credentials_file}: {err}') | |
print('will reauthenticate.') | |
credentials_data = None | |
if not credentials_data: | |
new_credentials = True | |
if debug: | |
print('Initiating authorization flow.') | |
credentials_data = auth_flow(cert, key, scope) | |
scopes = scope.split(' ') | |
creds = google.oauth2.credentials.Credentials.from_authorized_user_info( | |
info=credentials_data, | |
scopes=scopes) | |
# make sure we're using the correct mTLS or non-mTLS endpoints | |
# as google.auth seems to revert back to non-mTLS for some reason. | |
# pylint: disable=protected-access | |
if cert and key: | |
creds._token_uri = MTLS_TOKEN_URI | |
else: | |
creds._token_uri = TOKEN_URI | |
if not creds.valid or force_refresh: | |
refreshed_credentials = True | |
if debug: | |
print('Initiating access token refresh.') | |
request = build_request(cert, key) | |
try: | |
creds.refresh(request) | |
except google.auth.exceptions.RefreshError as err: | |
print(f'ERROR: {err}') | |
sys.exit(1) | |
if new_credentials or refreshed_credentials: | |
credentials_data = json.loads(creds.to_json()) | |
creds_file.truncate(0) | |
creds_file.seek(0) | |
creds_file.write(json.dumps(credentials_data, indent=2, sort_keys=True)) | |
return creds | |
def build_httpc(cert, key): | |
''' | |
returns an httpc object with optional | |
mTLS client cert configured. | |
''' | |
httpc = httplib2.Http() | |
if cert and key: | |
httpc.add_certificate(key, cert, "") | |
return httpc | |
def build_request(cert, key): | |
''' | |
returns an http request object | |
''' | |
httpc = build_httpc(cert, key) | |
return google_auth_httplib2.Request(httpc) | |
def auth_flow(cert, key, scope): | |
''' | |
performs 3-legged OAuth authorization flow. | |
''' | |
client_config = { | |
'installed': { | |
"auth_uri": "https://accounts.google.com/o/oauth2/v2/auth", | |
"client_id": "978969553067-73svibj9j7d5sbq6c5587qorl7qouesh.apps.googleusercontent.com", | |
"client_secret": "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", | |
}, | |
} | |
kwargs = {} | |
if cert and key: | |
kwargs['cert'] = (cert, key) | |
client_config['installed']['token_uri'] = MTLS_TOKEN_URI | |
else: | |
client_config['installed']['token_uri'] = TOKEN_URI | |
flow = InstalledAppFlow.from_client_config(client_config=client_config, | |
scopes=scope, | |
autogenerate_code_verifier=True) | |
flow.redirect_uri = 'http://localhost:8080/' | |
auth_url, _ = flow.authorization_url() | |
kwargs['code'] = input(f'''Please go to: | |
{auth_url} | |
and approve. You'll be redirected to an error page. Copy the localhost URL of | |
the error page and paste it here.: ''') | |
if kwargs['code'].startswith('http'): | |
parsed_url = urlparse(kwargs['code']) | |
parsed_params = parse_qs(parsed_url.query) | |
kwargs['code'] = parsed_params.get('code', [None])[0] | |
try: | |
flow.fetch_token(**kwargs) | |
except oauthlib.oauth2.rfc6749.errors.AccessDeniedError as err: | |
print(f'ERROR: {err}') | |
sys.exit(1) | |
return json.loads(flow.credentials.to_json()) | |
def setup_client_options(cert, key): | |
''' | |
returns client_options dict with optional client cert. | |
''' | |
def get_cert(): | |
return(cert, key, None) | |
client_options = {} | |
if cert and key: | |
client_options['client_encrypted_cert_source'] = get_cert | |
# googleapiclient uses these env variables when building an API | |
# to decide if it should use mTLS or not. | |
os.environ['GOOGLE_API_USE_CLIENT_CERTIFICATE'] = 'true' | |
os.environ['GOOGLE_API_USE_MTLS_ENDPOINT'] = 'always' | |
return client_options | |
def build_service(api, api_ver, creds, client_options): | |
''' | |
returns Google discovery API object. | |
''' | |
try: | |
return build(api, | |
api_ver, | |
credentials=creds, | |
client_options=client_options, | |
cache_discovery=False, | |
static_discovery=False, | |
num_retries=5) | |
except googleapiclient.errors.UnknownApiNameOrVersion as err: | |
print(f'ERROR: unknown API - {err}') | |
sys.exit(1) | |
def build_api_call(creds, cert, key, call_service): | |
''' | |
returns the given service API call. | |
''' | |
client_options = setup_client_options(cert, key) | |
match call_service: | |
case 'drive': | |
api = 'drive' | |
api_ver = 'v3' | |
functions = ['about', 'get'] | |
params = { | |
'fields': 'user(displayName,emailAddress),storageQuota', | |
} | |
case 'gmail': | |
api = 'gmail' | |
api_ver = 'v1' | |
functions = ['users', 'getProfile'] | |
params = { | |
'userId': 'me', | |
} | |
case 'calendar': | |
api = 'calendar' | |
api_ver = 'v3' | |
functions = ['calendars', 'get'] | |
params = { | |
'calendarId': 'primary', | |
} | |
# Calendar API discovery doc at: | |
# | |
# https://calendar-json.googleapis.com/$discovery/rest?version=v3 | |
# | |
# doesn't currently have an mtlsRootUrl value so we have to set endpoint | |
# manually | |
if cert and key: | |
client_options['api_endpoint'] = 'https://www.mtls.googleapis.com/calendar/v3/' | |
case 'cloudsearch': | |
api = 'cloudsearch' | |
api_ver = 'v1' | |
functions = ['settings', 'searchapplications', 'list'] | |
params = {'pageSize': 5} | |
case 'groups': | |
api = 'cloudidentity' | |
api_ver = 'v1' | |
functions = ['groups', 'memberships', 'searchDirectGroups'] | |
params = { | |
'parent': 'groups/-', | |
'query': "member_key_id == '[email protected]'", | |
} | |
case 'chat': | |
api = 'chat' | |
api_ver = 'v1' | |
functions = ['spaces', 'list'] | |
params = {'pageSize': 5} | |
case 'meet': | |
api = 'meet' | |
api_ver = 'v2' | |
functions = ['conferenceRecords', 'list'] | |
params = {'pageSize': 5} | |
case 'directory': | |
api = 'admin' | |
api_ver = 'directory_v1' | |
functions = ['users', 'list'] | |
params = { | |
'customer': 'my_customer', | |
'maxResults': 5, | |
'viewType': 'domain_public', | |
'fields': 'users(primaryEmail,name,id)', | |
} | |
case 'vault': | |
api = 'vault' | |
api_ver = 'v1' | |
functions = ['matters', 'list'] | |
params = {'pageSize': 5} | |
case 'keep': | |
# Note: currently Keep API requires domain-wide delegation and won't | |
# work with 3-legged OAuth. We expect Keep to fail here. | |
api = 'keep' | |
api_ver = 'v1' | |
functions = ['notes', 'list'] | |
params = {'pageSize': 5} | |
case 'tasks': | |
api = 'tasks' | |
api_ver = 'v1' | |
functions = ['tasklists', 'list'] | |
params = {'maxResults': 5} | |
case 'resourcemanager': | |
api = 'cloudresourcemanager' | |
api_ver = 'v3' | |
functions = ['projects', 'search'] | |
params = {'pageSize': 5} | |
case None: | |
print(creds.token) | |
sys.exit(0) | |
svc = build_service(api, api_ver, creds, client_options) | |
return (svc, functions, params) | |
def call_api(svc, functions, params): | |
''' | |
performs actual Google API call. | |
''' | |
api_call = getattr(svc, functions.pop(0)) | |
for func in functions: | |
api_call = getattr(api_call(), func) | |
try: | |
result = api_call(**params).execute() | |
except googleapiclient.errors.HttpError as err: | |
print(f'API Call ERROR: {err}') | |
sys.exit(1) | |
except google.auth.exceptions.RefreshError as err: | |
print(f'Token refresh ERROR: {err}') | |
sys.exit(1) | |
print(json.dumps(result, | |
sort_keys=True, | |
indent=2)) | |
def main(): | |
''' | |
main screen turn on | |
''' | |
args = parse_args() | |
creds = get_valid_credentials(credentials_file=args.credentials_file, | |
debug=args.debug, | |
scope=args.scope, | |
force_refresh=args.force_refresh, | |
cert=args.cert, | |
key=args.key) | |
svc, functions, params = build_api_call(creds, | |
args.cert, | |
args.key, | |
args.call_service) | |
call_api(svc, functions, params) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment