Created
July 14, 2017 01:48
-
-
Save jmelloy/80f6184be9e794a5b5f7d91f8a0f66d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import logging | |
import struct | |
import sys | |
import os | |
sys.path.append("lib") | |
import httplib2 | |
from googleapiclient import http | |
from googleapiclient.discovery import build | |
from oauth2client.service_account import ServiceAccountCredentials | |
logger = logging.getLogger() | |
GOOGLEAPI_AUTH_URL = 'https://www.googleapis.com/auth/' | |
DEBUG = False | |
http_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "responses") | |
if DEBUG and not os.path.exists(http_path): | |
os.makedirs(http_path) | |
def dparse(data, path): | |
x = path.split("/") | |
for k in x: | |
if k: | |
data = data.get(k, {}) | |
if data: | |
return data | |
return None | |
class HttpRecord(httplib2.Http): | |
def request(self, uri, *args, **kwargs): | |
resp, content = super().request(uri, *args, **kwargs) | |
filename = os.path.join(http_path, f'{datetime.datetime.now()}-{uri.replace("/", "-")}-{kwargs.get("method")}') | |
with open(filename, "wb") as f: | |
f.write(content) | |
return resp, content | |
def credentials(name, json="secret.json", scope=None): | |
if scope is None: | |
scope = GOOGLEAPI_AUTH_URL + name | |
elif '://' in scope: | |
assert GOOGLEAPI_AUTH_URL in scope, scope | |
else: | |
scope = GOOGLEAPI_AUTH_URL + scope | |
cred = ServiceAccountCredentials.from_json_keyfile_name(json, scope) | |
assert cred.invalid is False | |
return cred | |
def service(name, json="secret.json", scope=None, v=None): | |
v = v or ('v2' if name == 'drive' else 'v1') | |
cred = credentials(name, json, scope=scope) | |
http = httplib2.Http() | |
if DEBUG: | |
http = HttpRecord() | |
return build(name, v, http=cred.authorize(http)) | |
def cs_put_object(file, bucket, key, mimetype="application/octet-stream", check_size=True, gs=None): | |
"""Uploads a file to google cloud storage. Retries once automatically if 503 returned""" | |
gs = gs or service("storage", scope="devstorage.read_write") | |
logger.info("[upload_cloud_store] Uploading %s to %s" % (key, bucket)) | |
if check_size: | |
exists, data = cs_get_object_info(bucket, key) | |
# I'd like to use md5 for this but not sure how google's calculating it | |
# u'md5Hash': u'n2j1RoJz0ewlq7khTTCdwg==', ??? maybe base64? | |
if exists and data["size"] == check_size: | |
logger.info("[upload_cloud_store] Skipping upload for %s" % (key, )) | |
return data | |
upload = http.MediaIoBaseUpload(file, mimetype=mimetype, resumable=True) | |
resp = gs.objects().insert( | |
bucket=bucket, | |
name=key, | |
media_body=upload | |
).execute() | |
logger.debug(resp) | |
return resp | |
def cs_get_object(bucket, key, gs=None, tracer=logger.info, filename=None): | |
import io | |
gs = gs or service("storage", scope="devstorage.read_write") | |
# https://cloud.google.com/storage/docs/json_api/v1/objects/get | |
# Get Metadata | |
req = gs.objects().get( | |
bucket=bucket, | |
object=key) | |
resp = req.execute() | |
# Get Payload Data | |
req = gs.objects().get_media( | |
bucket=bucket, | |
object=key) | |
# The BytesIO object may be replaced with any io.Base instance. | |
if not filename: | |
fh = io.BytesIO() | |
else: | |
fh = open(filename, "wb") | |
downloader = http.MediaIoBaseDownload(fh, req, chunksize=1024 * 1024) | |
done = False | |
while not done: | |
status, done = downloader.next_chunk() | |
if status: | |
tracer('Download %d%%.' % int(status.progress() * 100)) | |
tracer('Download Complete!') | |
if filename: | |
fh.close() | |
fh = open(filename) | |
else: | |
fh.seek(0) | |
return fh | |
def cs_get_object_info(bucket, key, gs=None): | |
gs = gs or service("storage", scope="devstorage.read_write") | |
req = gs.objects().get( | |
bucket=bucket, | |
object=key) | |
try: | |
resp = req.execute() | |
resp["size"] = int(resp["size"]) | |
return True, resp | |
except http.HttpError as he: | |
if "404" in str(he): | |
return False, {} | |
else: | |
logger.error(he) | |
raise | |
def cs_delete_object(bucket, key, gs=None): | |
gs = gs or service("storage", scope="devstorage.read_write") | |
logger.info("Deleting %s from %s" % (bucket, key)) | |
req = gs.objects().delete( | |
bucket=bucket, | |
object=key | |
) | |
resp = req.execute() | |
# Success returns empty string: '' | |
return resp | |
def bq_job_status(jobId, projectId="whois-980", gs=None): | |
""" Checks the status of a bigquery job, such as returned by bq_export, load, or asynchronous query """ | |
gs = gs or service("bigquery", v="v2") | |
resp = gs.jobs().get(projectId=projectId, jobId=jobId).execute() | |
return resp | |
def bq_load(gs_filename, destination, projectId="whois-980", | |
datasetId='whois2015', fields=[], gs=None, **kwargs): | |
gs = gs or service("bigquery", v="v2") | |
if type(gs_filename) != list: | |
gs_filename = [gs_filename] | |
if type(fields) != list: | |
raise ValueError("Fields is not a list!") | |
logger.info("[bq_load] Loading %s into %s %s" % (gs_filename, destination, | |
",".join(["%s:%s" % (k, v) | |
for k, v in kwargs.items()]) if kwargs else "")) | |
field_names = [] | |
for f in fields: | |
r = f.split(":") | |
field_type = 'STRING' | |
if len(r) > 1: | |
field_type = r[1] | |
field_names.append({"name": r[0], "type": field_type, "mode": 'NULLABLE'}) | |
# https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load | |
body = { | |
"configuration": { # [Required] Describes the job configuration. | |
"load": { # [Pick one] Configures a load job. | |
"destinationTable": { # [Required] The destination table to load the data into. | |
"projectId": projectId, # [Required] The ID of the project containing this table. | |
"tableId": destination, | |
# [Required] The ID of the table. The ID must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_). The maximum length is 1,024 characters. | |
"datasetId": datasetId, # [Required] The ID of the dataset containing this table. | |
}, | |
"sourceUris": # [Required] The list of fully-qualified URIs that point to your data in Google Cloud Storage. Each URI can contain one '*' wildcard character and it must come after the 'bucket' name. | |
gs_filename | |
, | |
"schema": { | |
# [Optional] The schema for the destination table. The schema can be omitted if the destination table already exists or if the schema can be inferred from the loaded data. | |
"fields": field_names, | |
}, | |
}, | |
}, | |
} | |
body["configuration"]["load"].update(kwargs) | |
resp = gs.jobs().insert( | |
projectId=projectId, | |
body=body | |
).execute() | |
logger.debug(resp) | |
logger.info("[bq_load] Started job %s" % resp["jobReference"]["jobId"]) | |
return resp["jobReference"]["jobId"] | |
def bq_tables(datasetId='whois2015', projectId="whois-980", tracer=logger.info, gs=None, maxResults=1000): | |
gs = gs or service("bigquery", v="v2") | |
tracer = tracer or globals()['tracer']() | |
token = '' | |
tables = [] | |
while token is not None: | |
rs = gs.tables().list( | |
projectId=projectId, | |
datasetId=datasetId, | |
maxResults=maxResults, | |
pageToken=token | |
).execute() | |
for t in tables: | |
yield t["tableReference"]["tableId"] | |
token = rs.get("nextPageToken") | |
tracer("Got %d tables" % len(tables)) | |
class BigQueryException(Exception): | |
pass | |
class BigQuery_Job(object): | |
"""Takes a BQ job id, calls the status API, and turns results into an object | |
Ref: https://cloud.google.com/bigquery/docs/reference/v2/jobs#resource | |
""" | |
def __init__(self, job_id, tags=None, projectId="whois-980", skip_check=False, gs=None): | |
self.job_id = job_id | |
self.tags = tags | |
self.projectId = projectId | |
self.complete = False | |
self.gs = gs | |
resp = None | |
if not skip_check: | |
resp = bq_job_status(job_id, projectId=self.projectId, gs=gs) | |
self.raw_json = resp | |
self.from_response(resp) | |
def __repr__(self): | |
return "<BQ Job %s: %s Started: %s Finished: %s%s>" % (self.job_id, self.state, | |
self.startTime, self.endTime, | |
' (%s)' % self.tags if self.tags else '') | |
def update(self): | |
if not self.complete: | |
logger.debug("Checking status for %s", self.job_id) | |
resp = bq_job_status(self.job_id, projectId=self.projectId, gs=self.gs) | |
self.from_response(resp) | |
def wait(self, timeout=10, raise_exception=True): | |
import time | |
while not self.complete: | |
self.update() | |
logger.info("Status for %s (%s)", self.job_id, self.state) | |
if self.complete: | |
break | |
time.sleep(timeout) | |
if raise_exception and self.state == "ERROR": | |
raise BigQueryException("BQ: %s" % self.errors) | |
def from_response(self, resp): | |
if resp is None: | |
resp = {} | |
self.raw_json = resp | |
def p(path): | |
return dparse(self.raw_json, path) | |
self.kind = p("/kind") | |
self.etag = p("/etag") | |
self.projectId = p("/jobReference/projectId") | |
self.jobId = p("/jobReference/jobId") | |
self.selfLink = p("/selfLink") | |
self.creationTime = _safe_datetime_from_ms(p("/statistics/creationTime")) | |
self.startTime = _safe_datetime_from_ms(p("/statistics/startTime")) | |
self.endTime = _safe_datetime_from_ms(p("/statistics/endTime")) | |
self.id = p("/id") | |
self.state = p("/status/state") | |
if p("/status/errorResult"): | |
self.state = 'ERROR' | |
self.errors = ["%s (%s)" % (u['message'], u.get("location", "")) for u in p("/status/errors")] | |
self.errorResult = p("/status/errorResult/message") | |
else: | |
self.errors = None | |
self.errorResult = None | |
self.job_type = p("/configuration.keys()[0]") | |
# job_type == 'extract': | |
self.destinationUri = p("/configuration/extract/destinationUri") | |
# job_type == 'query': | |
self.query = p("/configuration/query/query") | |
self.destinationTableId = p("/configuration/query/destinationTable/tableId") | |
self.destinationProjectId = p("/configuration/query/destinationTable/projectId") | |
self.destinationDatasetId = p("/configuration/query/destinationTable/datasetId") | |
self.inputFiles = _safe_int(p("/statistics/load/inputFiles")) | |
self.inputBytes = _safe_int(p("/statistics/load/inputFileBytes")) | |
self.outputRows = _safe_int(p("/statistics/load/outputRows")) | |
self.user_email = p("/user_email") | |
if self.state in ("DONE", "ERROR"): | |
self.complete = True | |
def _safe_int(x): | |
try: | |
if x is not None: | |
return int(x) | |
except ValueError as ve: | |
logger.warning("Could not convert %s to int", x) | |
return x | |
return x | |
def _safe_datetime_from_ms(x): | |
try: | |
if x is not None: | |
x = _safe_int(x) | |
return datetime.datetime.fromtimestamp(x / 1000) | |
return None | |
except Exception as e: | |
logger.warning("Could not convert %s to date (%s)", x, e) | |
return x |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment