Do: pip install -r requirements.txt
Do: python taskqueue.py
You will need a service file. Amend the location in the entry point in taskqueue.py.
| """ | |
| Google Cloud auth via service account file | |
| """ | |
| # stdlib | |
| import datetime | |
| import threading | |
| import time | |
| # 3rd party | |
| import jwt | |
| # internal | |
| from methods import post | |
| from utils import extract_json_fields, json_read | |
| JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer' | |
| GCLOUD_TOKEN_DURATION = 3600 | |
| MISMATCH = "Project name passed to Token does not match service_file's " \ | |
| "project_id." | |
| def acquire_token(service_data, scopes): | |
| url, assertion = generate_assertion(service_data, scopes) | |
| payload = ( | |
| ('grant_type', JWT_GRANT_TYPE), | |
| ('assertion', assertion) | |
| ) | |
| status, content = post( | |
| url, | |
| payload, | |
| timeout=60, | |
| urlencoded=True, | |
| json_response=True | |
| ) | |
| data = extract_json_fields( | |
| content, ( | |
| ('access_token', str), | |
| ('expires_in', int) | |
| ) | |
| ) | |
| return data | |
| def generate_assertion(service_data, scopes): | |
| payload = make_gcloud_oauth_body( | |
| service_data['token_uri'], | |
| service_data['client_email'], | |
| scopes | |
| ) | |
| jwt_token = jwt.encode( | |
| payload, | |
| service_data['private_key'], | |
| algorithm='RS256' # <-- this means we need 240MB in additional | |
| # dependencies... | |
| ) | |
| return service_data['token_uri'], jwt_token | |
| def make_gcloud_oauth_body(uri, client_email, scopes): | |
| now = int(time.time()) | |
| return { | |
| 'aud': uri, | |
| 'exp': now + GCLOUD_TOKEN_DURATION, | |
| 'iat': now, | |
| 'iss': client_email, | |
| 'scope': ' '.join(scopes), | |
| } | |
| class Token(object): | |
| def __init__(self, project, service_file, scopes=None): | |
| self.project = project | |
| self.service_data = json_read(service_file) | |
| # sanity check | |
| assert self.project == self.service_data['project_id'], MISMATCH | |
| self.scopes = scopes or [] | |
| self.token = None | |
| self.expires_at = None | |
| self.expire_time = None | |
| self.acquire_lock = threading.Lock() | |
| def get(self): | |
| self.ensure_token() | |
| return self.token | |
| def ensure_token(self): | |
| with self.acquire_lock: | |
| renew = False | |
| if self.token: | |
| now = datetime.datetime.now() | |
| remaining = (self.expires_at - now).total_seconds() | |
| renew = remaining < self.expire_time / 2 | |
| if not self.token or renew: | |
| token_data = acquire_token( | |
| self.service_data, | |
| self.scopes | |
| ) | |
| self.token = token_data['access_token'] | |
| self.expire_time = token_data['expires_in'] | |
| now = datetime.datetime.now() | |
| delta = datetime.timedelta(seconds=self.expire_time) | |
| self.expires_at = now + delta | |
| def acquire_access_token(self): | |
| data = acquire_token( | |
| self.service_data, | |
| self.scopes | |
| ) | |
| access_token = data['access_token'] | |
| expires_in = data['expires_in'] | |
| self.access_token = access_token | |
| self.access_token_duration = expires_in | |
| self.access_token_acquired_at = datetime.datetime.now() | |
| self.acquiring = None | |
| return True |
| import json | |
| from functools import partial | |
| try: | |
| # Python 3 | |
| from urllib.parse import urlencode, quote_plus | |
| url_encode = partial(urlencode, quote_via=quote_plus) | |
| except ImportError: | |
| # Python 2 | |
| from urllib import urlencode | |
| url_encode = urlencode | |
| import requests | |
| def post(url, payload, timeout=60, urlencoded=False, json_response=True, | |
| headers=None): | |
| headers = headers or {} | |
| if urlencoded: | |
| if payload: | |
| payload = url_encode(payload) | |
| headers.update({ | |
| 'content-type': 'application/x-www-form-urlencoded' | |
| }) | |
| else: | |
| if payload: | |
| payload = json.dumps(payload) | |
| payload = payload.encode('utf-8') | |
| content_length = str(len(payload)) | |
| else: | |
| content_length = '0' | |
| headers.update({ | |
| 'content-length': content_length, | |
| 'content-type': 'application/json' | |
| }) | |
| response = requests.post( | |
| url, | |
| data=payload, | |
| headers=headers, | |
| timeout=timeout | |
| ) | |
| if json_response: | |
| content = response.json() | |
| else: | |
| content = response.text | |
| return response.status_code, content | |
| def get(url, params=None, headers=None, json_response=True): | |
| headers = headers or {} | |
| params = params or {} | |
| response = requests.get(url, params=params, headers=headers) | |
| if json_response: | |
| content = response.json() | |
| else: | |
| content = response.text | |
| return response.status_code, content |
| cryptography | |
| PyJWT | |
| requests | |
| ujson |
| """ | |
| An asynchronous queue for Google Appengine Task Queues | |
| """ | |
| import datetime | |
| import logging | |
| from auth import Token | |
| # internal | |
| from methods import post | |
| from utils import clean_b64encode | |
| import ujson | |
| log = logging.getLogger(__name__) | |
| log.setLevel(logging.INFO) | |
| API_ROOT = 'https://www.googleapis.com/taskqueue/v1beta2/projects' | |
| SCOPES = [ | |
| "https://www.googleapis.com/auth/taskqueue", | |
| "https://www.googleapis.com/auth/taskqueue.consumer", | |
| "https://www.googleapis.com/auth/cloud-taskqueue", | |
| "https://www.googleapis.com/auth/cloud-taskqueue.consumer" | |
| ] | |
| TASK_QUEUE_URL = '{api_root}/s~{project_name}/taskqueues/{queue_name}/tasks' | |
| def make_insert_body(queue_name, payload): | |
| delta = datetime.datetime.now() - datetime.datetime(1970, 1, 1) | |
| micro_sec_since_epock = int(delta.total_seconds() * 1000000) | |
| encoded_payload = clean_b64encode(ujson.dumps(payload)) | |
| return { | |
| "kind": "taskqueues#task", | |
| "queueName": queue_name, | |
| "payloadBase64": encoded_payload, | |
| "enqueueTimestamp": micro_sec_since_epock, | |
| "leaseTimestamp": 0, | |
| "retry_count": 0 | |
| } | |
| class TaskQueue(object): | |
| """ | |
| A Google Task Queue via requests | |
| """ | |
| def __init__(self, queue_name, project=None, service_file=None, | |
| token=None): | |
| if not token: | |
| token = Token(project, service_file, scopes=SCOPES) | |
| self.project = project | |
| self.queue_name = queue_name | |
| self.service_file = service_file | |
| self.token = token | |
| self.url = TASK_QUEUE_URL.format( | |
| api_root=API_ROOT, | |
| project_name=project, | |
| queue_name=queue_name | |
| ) | |
| self.api_root = API_ROOT | |
| self.default_header = { | |
| 'content-length': '0', | |
| 'accept': 'application/json', | |
| 'Authorization': '' | |
| } | |
| def headers(self, override=None): | |
| """ | |
| return the default headers for any http call | |
| """ | |
| token = self.token.get() | |
| header = {k: v for k, v in self.default_header.items()} | |
| header.update(override or {}) | |
| header.update({ | |
| 'Authorization': 'Bearer {}'.format(token) | |
| }) | |
| return header | |
| def insert_task(self, payload, id=None, tag=''): | |
| if tag: | |
| payload['tag'] = tag | |
| body = make_insert_body(self.queue_name, payload) | |
| status, content = post( | |
| self.url, | |
| body, | |
| headers=self.headers() | |
| ) | |
| success = status >= 200 and status < 300 | |
| if not success: | |
| log.error("NO INSERTO: {}".format(content)) | |
| return success | |
| if __name__ == "__main__": | |
| project = 'talkiq-integration' | |
| queue_name = 'test-pull' | |
| service_file = '/opt/service-integration.json' | |
| tq = TaskQueue(queue_name, project=project, service_file=service_file) | |
| result = tq.insert_task({ | |
| 'some_key': 'some_value' | |
| }) | |
| assert result is True, 'result is {}'.format(result) |
| import base64 | |
| import json | |
| def json_read(file_name): | |
| with open(file_name, 'r') as f: | |
| data = f.read() | |
| return json.loads(data) | |
| def extract_json_fields(content, spec): | |
| if 'error' in content: | |
| raise Exception('{}'.format(content)) | |
| return {field: cast(content[field]) for field, cast in spec} | |
| def clean_b64encode(payload): | |
| if not isinstance(payload, bytes): | |
| payload = payload.encode('utf-8') | |
| return ( | |
| base64.b64encode(payload) | |
| .replace(b"+", b"-") | |
| .replace(b"/", b"_") | |
| .decode('utf-8') | |
| ) |