Skip to content

Instantly share code, notes, and snippets.

@jay0lee
Last active September 26, 2024 11:59
Show Gist options
  • Save jay0lee/67e2094d352e2a9f89010636d2da271d to your computer and use it in GitHub Desktop.
Save jay0lee/67e2094d352e2a9f89010636d2da271d to your computer and use it in GitHub Desktop.
Google 3-legged OAuth with optional mTLS support
#!/usr/bin/env python3
'''
Perform 3-legged OAuth with optional mTLS and Google Workspace API calls
'''
import argparse
import http.client
import json
import os
from pathlib import Path
import sys
from urllib.parse import parse_qs, urlparse
import google.oauth2.credentials
from google_auth_oauthlib.flow import InstalledAppFlow
import google_auth_httplib2
import googleapiclient
from googleapiclient.discovery import build
import httplib2
import oauthlib
TOKEN_URI = 'https://oauth2.googleapis.com/token'
MTLS_TOKEN_URI = 'https://oauth2.mtls.googleapis.com/token'
# map the short scope name to the long one. Google always returns the long
# scope and it confuses google-auth and prints a warning.
scope_mapping = {
'email': 'https://www.googleapis.com/auth/userinfo.email',
'profile': 'https://www.googleapis.com/auth/userinfo.profile',
}
# minimal scopes needed to perform our basic service call
service_scope_mapping = {
'drive': 'https://www.googleapis.com/auth/drive.appdata',
'gmail': 'https://www.googleapis.com/auth/gmail.metadata',
'calendar': ',https://www.googleapis.com/auth/calendar.readonly',
'cloudsearch': 'https://www.googleapis.com/auth/cloud_search.settings.query',
'groups': 'https://www.googleapis.com/auth/cloud-identity.groups.readonly',
'vault': 'https://www.googleapis.com/auth/ediscovery.readonly',
'directory': 'https://www.googleapis.com/auth/admin.directory.user.readonly',
'chat': 'https://www.googleapis.com/auth/chat.spaces.readonly',
'meet': 'https://www.googleapis.com/auth/meetings.space.readonly',
'keep': 'https://www.googleapis.com/auth/keep.readonly',
'tasks': 'https://www.googleapis.com/auth/tasks.readonly',
'resourcemanager': 'https://www.googleapis.com/auth/cloudplatformprojects.readonly',
}
def check_readable_file(filename):
'''
utility function to ensure user specified a file we can read.
'''
if not os.path.exists(filename):
raise argparse.ArgumentTypeError(f'{filename} does not exist')
return filename
def parse_args():
'''
parses the script's arguments
'''
parser = argparse.ArgumentParser(
description='Perform 3-legged Google OAuth with optional mTLS.')
parser.add_argument(
'--scope',
help=('OAuth scopes to request. Separate scopes with a space. If '
'not specified and --call-service is specified will use '
'scope appropriate to the service.'))
parser.add_argument('--credentials-file',
help='file where credentials will be stored.',
required=True)
parser.add_argument('--cert',
type=check_readable_file,
metavar='FILE',
help=('optional. File with the client certificate for '
'mTLS. If specified --key must also be '
'specified.'))
parser.add_argument('--key',
type=check_readable_file,
metavar='FILE',
help=('optional. file with the client private key for '
'mTLS. If specified --cert must also be '
'specified.'))
parser.add_argument('--force-refresh',
help='optional. force access token refresh.',
action='store_true')
parser.add_argument('--call-service',
help=('optional. make an API call to a service. '
'Default is to output access token.'),
# sites is not a choice because there is no New Sites API.
choices=['drive',
'gmail',
'calendar',
'cloudsearch',
'groups',
'vault',
'directory',
'chat',
'meet',
'keep',
'tasks',
'resourcemanager'])
parser.add_argument('--debug',
help='output HTTP debug logging of API calls.',
action='store_true')
args = parser.parse_args()
if args.cert and not args.key:
parser.error('When specifying --cert you must also specify --key.')
elif args.key and not args.cert:
parser.error('When specifying --key you must also specify --cert.')
if args.call_service == 'keep':
print(('\nWARNING: Keep API may not support 3-legged OAuth and you '
'may see an error on the consent screen about invalid '
'scopes.\n'))
# if --scope wasn't set but --call-service was use a sane default
if args.call_service and not args.scope:
args.scope = service_scope_mapping.get(args.call_service)
elif not args.scope:
parser.error('You must specify at least --scope or --call-service.')
# substitute our scope mappings as needed
args.scope = ' '.join([scope_mapping.get(scope, scope) for scope in args.scope.split(' ')])
if args.debug:
# enable debugging for requests used by google-auth
http.client.HTTPConnection.debuglevel = 1
# enable debugging for httplib2 used by googleapiclient
httplib2.debuglevel = 4
return args
# pylint: disable=too-many-arguments
def get_valid_credentials(credentials_file,
debug,
scope,
force_refresh,
cert,
key):
'''
reads credentials from file or creates new ones.
Ensures returned credentials are valid.
'''
# Needs to be set so oauthlib doesn't puke when Google changes our scopes
os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = 'true'
new_credentials = False
refreshed_credentials = False
if not os.path.exists(credentials_file):
new_credentials = True
Path.touch(credentials_file)
with open(credentials_file, 'r+', encoding="utf-8") as creds_file:
try:
credentials_data = json.load(creds_file)
except json.decoder.JSONDecodeError as err:
if not new_credentials:
print(f'JSON parsing error reading {credentials_file}: {err}')
print('will reauthenticate.')
credentials_data = None
if not credentials_data:
new_credentials = True
if debug:
print('Initiating authorization flow.')
credentials_data = auth_flow(cert, key, scope)
scopes = scope.split(' ')
creds = google.oauth2.credentials.Credentials.from_authorized_user_info(
info=credentials_data,
scopes=scopes)
# make sure we're using the correct mTLS or non-mTLS endpoints
# as google.auth seems to revert back to non-mTLS for some reason.
# pylint: disable=protected-access
if cert and key:
creds._token_uri = MTLS_TOKEN_URI
else:
creds._token_uri = TOKEN_URI
if not creds.valid or force_refresh:
refreshed_credentials = True
if debug:
print('Initiating access token refresh.')
request = build_request(cert, key)
try:
creds.refresh(request)
except google.auth.exceptions.RefreshError as err:
print(f'ERROR: {err}')
sys.exit(1)
if new_credentials or refreshed_credentials:
credentials_data = json.loads(creds.to_json())
creds_file.truncate(0)
creds_file.seek(0)
creds_file.write(json.dumps(credentials_data, indent=2, sort_keys=True))
return creds
def build_httpc(cert, key):
'''
returns an httpc object with optional
mTLS client cert configured.
'''
httpc = httplib2.Http()
if cert and key:
httpc.add_certificate(key, cert, "")
return httpc
def build_request(cert, key):
'''
returns an http request object
'''
httpc = build_httpc(cert, key)
return google_auth_httplib2.Request(httpc)
def auth_flow(cert, key, scope):
'''
performs 3-legged OAuth authorization flow.
'''
client_config = {
'installed': {
"auth_uri": "https://accounts.google.com/o/oauth2/v2/auth",
"client_id": "978969553067-73svibj9j7d5sbq6c5587qorl7qouesh.apps.googleusercontent.com",
"client_secret": "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",
},
}
kwargs = {}
if cert and key:
kwargs['cert'] = (cert, key)
client_config['installed']['token_uri'] = MTLS_TOKEN_URI
else:
client_config['installed']['token_uri'] = TOKEN_URI
flow = InstalledAppFlow.from_client_config(client_config=client_config,
scopes=scope,
autogenerate_code_verifier=True)
flow.redirect_uri = 'http://localhost:8080/'
auth_url, _ = flow.authorization_url()
kwargs['code'] = input(f'''Please go to:
{auth_url}
and approve. You'll be redirected to an error page. Copy the localhost URL of
the error page and paste it here.: ''')
if kwargs['code'].startswith('http'):
parsed_url = urlparse(kwargs['code'])
parsed_params = parse_qs(parsed_url.query)
kwargs['code'] = parsed_params.get('code', [None])[0]
try:
flow.fetch_token(**kwargs)
except oauthlib.oauth2.rfc6749.errors.AccessDeniedError as err:
print(f'ERROR: {err}')
sys.exit(1)
return json.loads(flow.credentials.to_json())
def setup_client_options(cert, key):
'''
returns client_options dict with optional client cert.
'''
def get_cert():
return(cert, key, None)
client_options = {}
if cert and key:
client_options['client_encrypted_cert_source'] = get_cert
# googleapiclient uses these env variables when building an API
# to decide if it should use mTLS or not.
os.environ['GOOGLE_API_USE_CLIENT_CERTIFICATE'] = 'true'
os.environ['GOOGLE_API_USE_MTLS_ENDPOINT'] = 'always'
return client_options
def build_service(api, api_ver, creds, client_options):
'''
returns Google discovery API object.
'''
try:
return build(api,
api_ver,
credentials=creds,
client_options=client_options,
cache_discovery=False,
static_discovery=False,
num_retries=5)
except googleapiclient.errors.UnknownApiNameOrVersion as err:
print(f'ERROR: unknown API - {err}')
sys.exit(1)
def build_api_call(creds, cert, key, call_service):
'''
returns the given service API call.
'''
client_options = setup_client_options(cert, key)
match call_service:
case 'drive':
api = 'drive'
api_ver = 'v3'
functions = ['about', 'get']
params = {
'fields': 'user(displayName,emailAddress),storageQuota',
}
case 'gmail':
api = 'gmail'
api_ver = 'v1'
functions = ['users', 'getProfile']
params = {
'userId': 'me',
}
case 'calendar':
api = 'calendar'
api_ver = 'v3'
functions = ['calendars', 'get']
params = {
'calendarId': 'primary',
}
# Calendar API discovery doc at:
#
# https://calendar-json.googleapis.com/$discovery/rest?version=v3
#
# doesn't currently have an mtlsRootUrl value so we have to set endpoint
# manually
if cert and key:
client_options['api_endpoint'] = 'https://www.mtls.googleapis.com/calendar/v3/'
case 'cloudsearch':
api = 'cloudsearch'
api_ver = 'v1'
functions = ['settings', 'searchapplications', 'list']
params = {'pageSize': 5}
case 'groups':
api = 'cloudidentity'
api_ver = 'v1'
functions = ['groups', 'memberships', 'searchDirectGroups']
params = {
'parent': 'groups/-',
'query': "member_key_id == '[email protected]'",
}
case 'chat':
api = 'chat'
api_ver = 'v1'
functions = ['spaces', 'list']
params = {'pageSize': 5}
case 'meet':
api = 'meet'
api_ver = 'v2'
functions = ['conferenceRecords', 'list']
params = {'pageSize': 5}
case 'directory':
api = 'admin'
api_ver = 'directory_v1'
functions = ['users', 'list']
params = {
'customer': 'my_customer',
'maxResults': 5,
'viewType': 'domain_public',
'fields': 'users(primaryEmail,name,id)',
}
case 'vault':
api = 'vault'
api_ver = 'v1'
functions = ['matters', 'list']
params = {'pageSize': 5}
case 'keep':
# Note: currently Keep API requires domain-wide delegation and won't
# work with 3-legged OAuth. We expect Keep to fail here.
api = 'keep'
api_ver = 'v1'
functions = ['notes', 'list']
params = {'pageSize': 5}
case 'tasks':
api = 'tasks'
api_ver = 'v1'
functions = ['tasklists', 'list']
params = {'maxResults': 5}
case 'resourcemanager':
api = 'cloudresourcemanager'
api_ver = 'v3'
functions = ['projects', 'search']
params = {'pageSize': 5}
case None:
print(creds.token)
sys.exit(0)
svc = build_service(api, api_ver, creds, client_options)
return (svc, functions, params)
def call_api(svc, functions, params):
'''
performs actual Google API call.
'''
api_call = getattr(svc, functions.pop(0))
for func in functions:
api_call = getattr(api_call(), func)
try:
result = api_call(**params).execute()
except googleapiclient.errors.HttpError as err:
print(f'API Call ERROR: {err}')
sys.exit(1)
except google.auth.exceptions.RefreshError as err:
print(f'Token refresh ERROR: {err}')
sys.exit(1)
print(json.dumps(result,
sort_keys=True,
indent=2))
def main():
'''
main screen turn on
'''
args = parse_args()
creds = get_valid_credentials(credentials_file=args.credentials_file,
debug=args.debug,
scope=args.scope,
force_refresh=args.force_refresh,
cert=args.cert,
key=args.key)
svc, functions, params = build_api_call(creds,
args.cert,
args.key,
args.call_service)
call_api(svc, functions, params)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment