Last active
May 31, 2022 13:41
-
-
Save av1m/a53f590c92ef8ca83d7b5bc3c7ef9faf to your computer and use it in GitHub Desktop.
Get an access token with a Google service account
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Get an access token with a service account\n", | |
"\n", | |
"I will present two ways to get an access token for a service account." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Install the dependencies\n", | |
"!pip install jwt google-auth google-api-python-client" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Manual way (not recommended)\n", | |
"\n", | |
"All the documentation is available [https://developers.google.com/identity/protocols/oauth2/service-account](here)\n", | |
"\n", | |
"The steps to get an access token are:\n", | |
"1. Retrieve the service account key JSON file from the Google Developers Console\n", | |
"2. Create the JWT claim set with the contents of the key JSON file and a scope value\n", | |
"3. Call Google API (https://oauth2.googleapis.com/token) to get an access token" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import json\n", | |
"import logging\n", | |
"import os\n", | |
"import time\n", | |
"\n", | |
"import jwt\n", | |
"\n", | |
"logger = logging.getLogger(__name__)\n", | |
"\n", | |
"class GoogleAuth:\n", | |
" \"\"\"Allow to get a valid access token\n", | |
"\n", | |
" Documentation:\n", | |
" https://developers.google.com/identity/protocols/oauth2/service-account#httprest\n", | |
"\n", | |
" Example::\n", | |
"\n", | |
" >>> gauth = GoogleAuth(\"credentials.json\", \"https://www.googleapis.com/auth/calendar.readonly\")\n", | |
" >>> gauth.get_access_token()\n", | |
" >>> gauth.token\n", | |
" \"\"\"\n", | |
"\n", | |
" def __init__(self, filename: str, scope: str, delegate_account: str = None):\n", | |
" \"\"\"Initialize the GoogleAuth object with a credentials file and a scope\n", | |
"\n", | |
" :param filename: Path to the service account json file, retrieved from the Google Developers Console\n", | |
" :type filename: str\n", | |
" :param scope: The scope of the access token\n", | |
" :type scope: str\n", | |
" :param delegate_account: The email of the account to impersonate (optional). If not provided, the service account email will be used.\n", | |
" :type delegate_account: str\n", | |
" \"\"\"\n", | |
" if not os.path.isfile(filename): # check that the file exists\n", | |
" raise Exception(\"File {} does not exist\".format(filename))\n", | |
" self.filename = filename\n", | |
" self.scope = scope\n", | |
" self.delegate_account = delegate_account\n", | |
" self.jwt = self.get_jwt()\n", | |
" self.token = None\n", | |
"\n", | |
" def get_jwt(self):\n", | |
" \"\"\"Get a JWT from the service account json file\n", | |
"\n", | |
" The information to how to get a JWT is available at:\n", | |
" https://developers.google.com/identity/protocols/oauth2/service-account#httprest\n", | |
"\n", | |
" This function don't call the Google API, it only read the json file.\n", | |
" \n", | |
" :return: The JWT as a string\n", | |
" :rtype: str\n", | |
" \"\"\"\n", | |
" service_account = json.loads(open(self.filename, \"rb\").read())\n", | |
" iat = time.time()\n", | |
"\n", | |
" payload = {\n", | |
" \"iss\": service_account.get(\"client_email\"),\n", | |
" \"sub\": self.delegate_account or service_account.get(\"client_email\"),\n", | |
" \"aud\": \"https://oauth2.googleapis.com/token\",\n", | |
" \"scope\": self.scope,\n", | |
" \"iat\": iat,\n", | |
" \"exp\": iat + 3600,\n", | |
" }\n", | |
" return jwt.encode(\n", | |
" payload=payload,\n", | |
" key=service_account.get(\"private_key\"),\n", | |
" headers={\"kid\": service_account.get(\"private_key_id\")},\n", | |
" algorithm=\"RS256\",\n", | |
" )\n", | |
"\n", | |
" def get_access_token(self):\n", | |
" \"\"\"Get a valid access token\n", | |
"\n", | |
" This function call the Google API to get a valid access token.\n", | |
" If the token is expired, it will'nt be refreshed.\n", | |
" \n", | |
" :return: The access token as a string\n", | |
" :rtype: str\n", | |
" \"\"\"\n", | |
" if not self.jwt:\n", | |
" raise Exception(\"Can't proceed without a JWT\")\n", | |
" import requests\n", | |
"\n", | |
" response = requests.post(\n", | |
" url=\"https://oauth2.googleapis.com/token\",\n", | |
" headers={\"Content-Type\": \"application/x-www-form-urlencoded\"},\n", | |
" data={\n", | |
" \"grant_type\": \"urn:ietf:params:oauth:grant-type:jwt-bearer\",\n", | |
" \"assertion\": self.jwt,\n", | |
" },\n", | |
" )\n", | |
" if not response.ok:\n", | |
" logger.error(response.text)\n", | |
" response.raise_for_status()\n", | |
" self.token = response.json().get(\"access_token\")\n", | |
" return self.token" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"gauth = GoogleAuth(\"credentials.json\", \"https://www.googleapis.com/auth/calendar.readonly\")\n", | |
"gauth.get_access_token()\n", | |
"print(gauth.token)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Automatic way (recommanded)\n", | |
"\n", | |
"Because the logic and cryptographic details of the process are complex, Google recommends that you use the library way to get an access token." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 80, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from google.oauth2 import service_account\n", | |
"\n", | |
"scopes = ['https://www.googleapis.com/auth/calendar.readonly']\n", | |
"filename = 'credentials.json'\n", | |
"\n", | |
"credentials = service_account.Credentials.from_service_account_file(filename=filename, scopes=scopes)\n", | |
"# credentials = credentials.with_subject('[email protected]') # optional (in case you need to delegate a user)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"The recovery of the access token can seem strange, but we must use the `build()` method and make a call to obtain an `access token`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 81, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import googleapiclient.discovery\n", | |
"\n", | |
"# We'll use the Oauth2 API for this example.\n", | |
"client = googleapiclient.discovery.build('oauth2', 'v2', credentials=credentials)\n", | |
"# Ask for a token\n", | |
"client.tokeninfo().execute()\n", | |
"print(credentials.token)" | |
] | |
} | |
], | |
"metadata": { | |
"interpreter": { | |
"hash": "7fb69ec69fe00c8a6e21064408ad6b5048c964396efeff3292e27c59d9a7ee10" | |
}, | |
"kernelspec": { | |
"display_name": "Python 3.10.1 ('.venv': venv)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.10.1" | |
}, | |
"orig_nbformat": 4 | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment