Created
May 23, 2017 21:53
-
-
Save jomido/93940858a803327197314ceae8b31462 to your computer and use it in GitHub Desktop.
asyncio gcloud auth (Python 3.6)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Google Cloud auth via service account file | |
| """ | |
| # stdlib | |
| import datetime | |
| import time | |
| import typing | |
| # 3rd party | |
| import aiohttp | |
| import jwt | |
| # internal | |
| from http_tools import post | |
| from utils import auto, extract_json_fields, json_read | |
| ScopeList = typing.List[str] | |
| JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer' | |
| GCLOUD_TOKEN_DURATION = 3600 | |
| MISMATCH = "Project name passed to Token does not match service_file's " \ | |
| "project_id." | |
| async def acquire_token(session: aiohttp.ClientSession, | |
| service_data: dict, | |
| scopes: ScopeList=None): | |
| url, assertion = generate_assertion(service_data, scopes) | |
| payload = { | |
| 'grant_type': JWT_GRANT_TYPE, | |
| 'assertion': assertion | |
| } | |
| status, content = await post( | |
| url, | |
| payload, | |
| headers={'content-type': 'application/x-www-form-urlencoded'}, | |
| timeout=60, | |
| urlencoded=True, | |
| json_response=True, | |
| session=session | |
| ) | |
| data = extract_json_fields( | |
| content, ( | |
| ('access_token', str), | |
| ('expires_in', int) | |
| ) | |
| ) | |
| return data | |
| def generate_assertion(service_data: dict, scopes: ScopeList=None): | |
| payload = make_gcloud_oauth_body( | |
| service_data['token_uri'], | |
| service_data['client_email'], | |
| scopes | |
| ) | |
| jwt_token = jwt.encode( | |
| payload, | |
| service_data['private_key'], | |
| algorithm='RS256' # <-- this means we need 240MB in additional | |
| # dependencies... | |
| ) | |
| return service_data['token_uri'], jwt_token | |
| def make_gcloud_oauth_body(uri: str, client_email: str, scopes: ScopeList): | |
| now = int(time.time()) | |
| return { | |
| 'aud': uri, | |
| 'exp': now + GCLOUD_TOKEN_DURATION, | |
| 'iat': now, | |
| 'iss': client_email, | |
| 'scope': ' '.join(scopes), | |
| } | |
| class Token(object): | |
| def __init__(self, project: str, service_file: str, | |
| session: aiohttp.ClientSession=None, scopes: ScopeList=None): | |
| self.project = project | |
| self.service_data = json_read(service_file) | |
| # sanity check | |
| assert self.project == self.service_data['project_id'], MISMATCH | |
| self.scopes = scopes or SCOPES | |
| self.session = session or aiohttp.ClientSession() | |
| self.access_token = None | |
| self.access_token_duration = None | |
| self.access_token_acquired_at = None | |
| self.acquiring = None | |
| async def get(self): | |
| await self.ensure_token() | |
| return self.access_token | |
| async def ensure_token(self): | |
| if self.acquiring: | |
| await self.acquiring | |
| elif not self.access_token: | |
| self.acquiring = self.acquire_access_token() | |
| await self.acquiring | |
| else: | |
| now = datetime.datetime.now() | |
| delta = (now - self.access_token_acquired_at).total_seconds() | |
| if delta > self.access_token_duration / 2: | |
| self.acquiring = self.acquire_access_token() | |
| await self.acquiring | |
| @auto | |
| async def acquire_access_token(self): | |
| data = await acquire_token( | |
| self.session, | |
| self.service_data, | |
| self.scopes | |
| ) | |
| access_token = data['access_token'] | |
| expires_in = data['expires_in'] | |
| self.access_token = access_token | |
| self.access_token_duration = expires_in | |
| self.access_token_acquired_at = datetime.datetime.now() | |
| self.acquiring = None | |
| return True |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import aiohttp | |
| from urllib.parse import urlencode, quote_plus | |
| async def post(url, payload, timeout=60, urlencoded=False, json_response=True, | |
| session=None, headers=None): | |
| headers = headers or {} | |
| if urlencoded: | |
| payload = urlencode(payload, quote_via=quote_plus) | |
| headers['content-type'] = 'application/x-www-form-urlencoded' | |
| else: | |
| payload = json.dumps(payload) | |
| payload = payload.encode('utf-8') | |
| headers.update({ | |
| 'content-length': str(len(payload)), | |
| 'content-type': 'application/json' | |
| }) | |
| one_time_session = None | |
| if not session: | |
| one_time_session = aiohttp.ClientSession() | |
| try: | |
| s = session or one_time_session | |
| response = await s.post( | |
| url, | |
| data=payload, | |
| headers=headers, | |
| timeout=timeout | |
| ) | |
| finally: | |
| if one_time_session: | |
| one_time_session.close() | |
| if json_response: | |
| content = await response.json() | |
| else: | |
| content = await response.text() # untested API | |
| return response.status, content |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import json | |
| from functools import wraps | |
| # asyncio utils | |
| def maybeAsync(callable, *args, **kwargs): | |
| """ | |
| Turn a callable into a coroutine if it isn't | |
| """ | |
| if asyncio.iscoroutine(callable): | |
| return callable | |
| return asyncio.coroutine(callable)(*args, **kwargs) | |
| def fire(callable, *args, **kwargs): | |
| """ | |
| Start a callable as a coroutine, and return it's future. The cool thing | |
| about this function is that (via maybeAsync) it lets you treat synchronous | |
| and asynchronous callables the same (both as async), which simplifies code. | |
| """ | |
| return asyncio.ensure_future(maybeAsync(callable, *args, **kwargs)) | |
| def auto(fn): | |
| """ | |
| Decorate a function or method with this, and it will become a callable | |
| that can be scheduled in the event loop just by calling it. Normally you'd | |
| have to do an `asyncio.ensure_future(my_callable())`. Not you can just do | |
| `my_callable()`. Twisted has always let you do this, and now you can let | |
| asyncio do it as well (with a decorator, albeit...) | |
| """ | |
| @wraps(fn) | |
| def wrapper(*args, **kwargs): | |
| return fire(fn, *args, **kwargs) | |
| return wrapper | |
| # json utils | |
| def json_read(file_name: str): | |
| with open(file_name, 'r') as f: | |
| data = f.read() | |
| return json.loads(data) | |
| def extract_json_fields(content, spec): | |
| if 'error' in content: | |
| raise Exception('{}'.format(content)) | |
| return {field: cast(content[field]) for field, cast in spec} |
Author
Author
^ the crypography lib adds an additional ~240MB to container size (as of the timestamp of this comment).
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
jwtlib depends on thecryptographylib. If you are building from a lightweight image (such as alpine), then you will need to install a few things for thecryptographylib:RUN apk --update add build-base libffi-dev openssl-dev python-dev py-pip.