Skip to content

Instantly share code, notes, and snippets.

@jomido
Created May 23, 2017 21:53
Show Gist options
  • Save jomido/93940858a803327197314ceae8b31462 to your computer and use it in GitHub Desktop.
Save jomido/93940858a803327197314ceae8b31462 to your computer and use it in GitHub Desktop.
asyncio gcloud auth (Python 3.6)
"""
Google Cloud auth via service account file
"""
# stdlib
import datetime
import time
import typing
# 3rd party
import aiohttp
import jwt
# internal
from http_tools import post
from utils import auto, extract_json_fields, json_read
ScopeList = typing.List[str]
JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'
GCLOUD_TOKEN_DURATION = 3600
MISMATCH = "Project name passed to Token does not match service_file's " \
"project_id."
async def acquire_token(session: aiohttp.ClientSession,
service_data: dict,
scopes: ScopeList=None):
url, assertion = generate_assertion(service_data, scopes)
payload = {
'grant_type': JWT_GRANT_TYPE,
'assertion': assertion
}
status, content = await post(
url,
payload,
headers={'content-type': 'application/x-www-form-urlencoded'},
timeout=60,
urlencoded=True,
json_response=True,
session=session
)
data = extract_json_fields(
content, (
('access_token', str),
('expires_in', int)
)
)
return data
def generate_assertion(service_data: dict, scopes: ScopeList=None):
payload = make_gcloud_oauth_body(
service_data['token_uri'],
service_data['client_email'],
scopes
)
jwt_token = jwt.encode(
payload,
service_data['private_key'],
algorithm='RS256' # <-- this means we need 240MB in additional
# dependencies...
)
return service_data['token_uri'], jwt_token
def make_gcloud_oauth_body(uri: str, client_email: str, scopes: ScopeList):
now = int(time.time())
return {
'aud': uri,
'exp': now + GCLOUD_TOKEN_DURATION,
'iat': now,
'iss': client_email,
'scope': ' '.join(scopes),
}
class Token(object):
def __init__(self, project: str, service_file: str,
session: aiohttp.ClientSession=None, scopes: ScopeList=None):
self.project = project
self.service_data = json_read(service_file)
# sanity check
assert self.project == self.service_data['project_id'], MISMATCH
self.scopes = scopes or SCOPES
self.session = session or aiohttp.ClientSession()
self.access_token = None
self.access_token_duration = None
self.access_token_acquired_at = None
self.acquiring = None
async def get(self):
await self.ensure_token()
return self.access_token
async def ensure_token(self):
if self.acquiring:
await self.acquiring
elif not self.access_token:
self.acquiring = self.acquire_access_token()
await self.acquiring
else:
now = datetime.datetime.now()
delta = (now - self.access_token_acquired_at).total_seconds()
if delta > self.access_token_duration / 2:
self.acquiring = self.acquire_access_token()
await self.acquiring
@auto
async def acquire_access_token(self):
data = await acquire_token(
self.session,
self.service_data,
self.scopes
)
access_token = data['access_token']
expires_in = data['expires_in']
self.access_token = access_token
self.access_token_duration = expires_in
self.access_token_acquired_at = datetime.datetime.now()
self.acquiring = None
return True
import json
import aiohttp
from urllib.parse import urlencode, quote_plus
async def post(url, payload, timeout=60, urlencoded=False, json_response=True,
session=None, headers=None):
headers = headers or {}
if urlencoded:
payload = urlencode(payload, quote_via=quote_plus)
headers['content-type'] = 'application/x-www-form-urlencoded'
else:
payload = json.dumps(payload)
payload = payload.encode('utf-8')
headers.update({
'content-length': str(len(payload)),
'content-type': 'application/json'
})
one_time_session = None
if not session:
one_time_session = aiohttp.ClientSession()
try:
s = session or one_time_session
response = await s.post(
url,
data=payload,
headers=headers,
timeout=timeout
)
finally:
if one_time_session:
one_time_session.close()
if json_response:
content = await response.json()
else:
content = await response.text() # untested API
return response.status, content
import asyncio
import json
from functools import wraps
# asyncio utils
def maybeAsync(callable, *args, **kwargs):
"""
Turn a callable into a coroutine if it isn't
"""
if asyncio.iscoroutine(callable):
return callable
return asyncio.coroutine(callable)(*args, **kwargs)
def fire(callable, *args, **kwargs):
"""
Start a callable as a coroutine, and return it's future. The cool thing
about this function is that (via maybeAsync) it lets you treat synchronous
and asynchronous callables the same (both as async), which simplifies code.
"""
return asyncio.ensure_future(maybeAsync(callable, *args, **kwargs))
def auto(fn):
"""
Decorate a function or method with this, and it will become a callable
that can be scheduled in the event loop just by calling it. Normally you'd
have to do an `asyncio.ensure_future(my_callable())`. Not you can just do
`my_callable()`. Twisted has always let you do this, and now you can let
asyncio do it as well (with a decorator, albeit...)
"""
@wraps(fn)
def wrapper(*args, **kwargs):
return fire(fn, *args, **kwargs)
return wrapper
# json utils
def json_read(file_name: str):
with open(file_name, 'r') as f:
data = f.read()
return json.loads(data)
def extract_json_fields(content, spec):
if 'error' in content:
raise Exception('{}'.format(content))
return {field: cast(content[field]) for field, cast in spec}
@jomido
Copy link
Author

jomido commented May 29, 2017

The jwt lib depends on the cryptography lib. If you are building from a lightweight image (such as alpine), then you will need to install a few things for the cryptography lib: RUN apk --update add build-base libffi-dev openssl-dev python-dev py-pip.

@jomido
Copy link
Author

jomido commented May 29, 2017

^ the crypography lib adds an additional ~240MB to container size (as of the timestamp of this comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment