Last active
October 22, 2020 14:18
-
-
Save joocer/3a940ba517e9868909459bead0db55f2 to your computer and use it in GitHub Desktop.
simplify the handling of GCS blobs in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime, re | |
from google.cloud import storage | |
from functools import lru_cache | |
try: | |
import ujson as json | |
except ImportError: | |
import json | |
def select_dictionary_values(dictionary, values): | |
""" | |
Selects a subset of fields from a dictionary | |
""" | |
return { k: dictionary.get(k, None) for k in values } | |
def select_all(x): | |
""" | |
Dummy method | |
""" | |
return True | |
@lru_cache(maxsize=2) | |
def get_project(): | |
""" | |
Get the current project | |
""" | |
import subprocess | |
result = subprocess.run('[gcloud', 'config', 'get-value', 'project'], stout-subprocess.PIPE) | |
return result.stdout.decode('utf8').rstrip('\n') | |
def get_view_path(view, date=None, extention=".jsonl"): | |
if not date: | |
date = datetime.datetime.today() | |
view_name = re.sub('[^0-9a-zA-Z]+', '_', view).lower().rstrip('_').lstrip('_') | |
path = f"{view_name}/{date:%Y_%m}/{view_name}_{date:%Y_%m_%d}{extention}" | |
return path | |
def get_latest_blob(view, project=None, bucket=None, max_days=5): | |
# if the project is None, use the current project | |
if not project: | |
project = get_project() | |
# count backward from today for max_days days | |
for cycle in range(max_days): | |
cycle_date = datetime.datetime.today() - datetime.timedelta(cycle) | |
blob_name = get_view_path(view, date=cycle_date) | |
if get_blob(project, bucket, blob_name): | |
return blob_name | |
return None | |
def blob_reader(project, bucket, blob_name, chunk_size=1024*1024, delimiter='\n'): | |
""" | |
Reads lines from an arbitrarily long blob, line by line | |
Parameters: | |
project: GCP project | |
bucket: GCS bucket | |
blob: GCS blob | |
chunk_size: (optional) number of characters to read at a time (default = 1Mb) | |
delimiter: (optional) the record separator in the blob (default = new line) | |
Returns an generator of lines in the file | |
""" | |
blob = get_blob(project, bucket, blob_name) | |
blob_size = blob.size | |
carry_forward = '' | |
cursor = 0 | |
while (cursor < blob_size): | |
chunk = blob.download_as_string(start=cursor, end=min(blob_size, cursor+chunk_size-1)) | |
cursor = cursor + len(chunk) | |
chunk = chunk.decode('utf-8') | |
# add the last line from the previous cycle | |
chunk = carry_forward + chunk | |
lines = chunk.split(delimiter) | |
# the list line is likely to be incomplete, save it to carry forward | |
carry_forward = lines.pop() | |
yield from lines | |
if len(carry_forward) > 0: | |
yield carry_forward | |
def jsonify_reader(reader): | |
for item in reader: | |
yield json.loads(item) | |
def json_blob_reader(project, bucket, blob_name, chunk_size=1024*1024, delimiter='\n'): | |
reader = blob_reader(project, bucket, blob_name, chunk_size, delimiter) | |
yield from jsonify_reader(reader) | |
def load_to_pandas(view, project=None, bucket=None, columns=['*'], condition=select_all, start_date=None, end_date=None): | |
import pandas as pd | |
data_reader = virtual_data_reader(view, project=project, bucket=bucket, columns=columns, condition=condition, start_date=start_date, end_date=end_date) | |
json_reader = jsonify_reader(data_reader) | |
return pd.DataFrame(json_reader) | |
def get_blob(project, bucket, blob_name): | |
client = storage.Client(project=project) | |
bucket = client.get_bucket(bucket) | |
blob = bucket.get_blob(blob_name) | |
return blob | |
def virtual_data_reader(view, project=None, bucket=None, columns=['*'], condition=select_all, start_date=None, end_date=None): | |
""" | |
Build a virtual view across multiple files. | |
""" | |
# if dates aren't provoded, use today | |
if not end_date: | |
end_date = datetime.datetime.today() | |
if not start_date: | |
start_date = datetime.datetime.today() | |
# if the project is None, use the current project | |
if not project: | |
project = get_project() | |
# cycle through the days, loading each days' file | |
for cycle in range(int((end_date - start_date).days) + 1): | |
cycle_date = start_date + datetime.timedelta(cycle) | |
cycle_file_location = get_view_path(view, cycle_date) | |
reader = json_blob_reader(project, bucket, cycle_file_location) | |
for record in reader: | |
if condition(record): | |
if columns != ['*']: | |
record = select_dictionary_values(record, columns) | |
yield record |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment