Skip to content

Instantly share code, notes, and snippets.

@joocer
Last active October 22, 2020 14:18
Show Gist options
  • Save joocer/3a940ba517e9868909459bead0db55f2 to your computer and use it in GitHub Desktop.
Save joocer/3a940ba517e9868909459bead0db55f2 to your computer and use it in GitHub Desktop.
simplify the handling of GCS blobs in Python
import datetime, re
from google.cloud import storage
from functools import lru_cache
try:
import ujson as json
except ImportError:
import json
def select_dictionary_values(dictionary, values):
"""
Selects a subset of fields from a dictionary
"""
return { k: dictionary.get(k, None) for k in values }
def select_all(x):
"""
Dummy method
"""
return True
@lru_cache(maxsize=2)
def get_project():
"""
Get the current project
"""
import subprocess
result = subprocess.run('[gcloud', 'config', 'get-value', 'project'], stout-subprocess.PIPE)
return result.stdout.decode('utf8').rstrip('\n')
def get_view_path(view, date=None, extention=".jsonl"):
if not date:
date = datetime.datetime.today()
view_name = re.sub('[^0-9a-zA-Z]+', '_', view).lower().rstrip('_').lstrip('_')
path = f"{view_name}/{date:%Y_%m}/{view_name}_{date:%Y_%m_%d}{extention}"
return path
def get_latest_blob(view, project=None, bucket=None, max_days=5):
# if the project is None, use the current project
if not project:
project = get_project()
# count backward from today for max_days days
for cycle in range(max_days):
cycle_date = datetime.datetime.today() - datetime.timedelta(cycle)
blob_name = get_view_path(view, date=cycle_date)
if get_blob(project, bucket, blob_name):
return blob_name
return None
def blob_reader(project, bucket, blob_name, chunk_size=1024*1024, delimiter='\n'):
"""
Reads lines from an arbitrarily long blob, line by line
Parameters:
project: GCP project
bucket: GCS bucket
blob: GCS blob
chunk_size: (optional) number of characters to read at a time (default = 1Mb)
delimiter: (optional) the record separator in the blob (default = new line)
Returns an generator of lines in the file
"""
blob = get_blob(project, bucket, blob_name)
blob_size = blob.size
carry_forward = ''
cursor = 0
while (cursor < blob_size):
chunk = blob.download_as_string(start=cursor, end=min(blob_size, cursor+chunk_size-1))
cursor = cursor + len(chunk)
chunk = chunk.decode('utf-8')
# add the last line from the previous cycle
chunk = carry_forward + chunk
lines = chunk.split(delimiter)
# the list line is likely to be incomplete, save it to carry forward
carry_forward = lines.pop()
yield from lines
if len(carry_forward) > 0:
yield carry_forward
def jsonify_reader(reader):
for item in reader:
yield json.loads(item)
def json_blob_reader(project, bucket, blob_name, chunk_size=1024*1024, delimiter='\n'):
reader = blob_reader(project, bucket, blob_name, chunk_size, delimiter)
yield from jsonify_reader(reader)
def load_to_pandas(view, project=None, bucket=None, columns=['*'], condition=select_all, start_date=None, end_date=None):
import pandas as pd
data_reader = virtual_data_reader(view, project=project, bucket=bucket, columns=columns, condition=condition, start_date=start_date, end_date=end_date)
json_reader = jsonify_reader(data_reader)
return pd.DataFrame(json_reader)
def get_blob(project, bucket, blob_name):
client = storage.Client(project=project)
bucket = client.get_bucket(bucket)
blob = bucket.get_blob(blob_name)
return blob
def virtual_data_reader(view, project=None, bucket=None, columns=['*'], condition=select_all, start_date=None, end_date=None):
"""
Build a virtual view across multiple files.
"""
# if dates aren't provoded, use today
if not end_date:
end_date = datetime.datetime.today()
if not start_date:
start_date = datetime.datetime.today()
# if the project is None, use the current project
if not project:
project = get_project()
# cycle through the days, loading each days' file
for cycle in range(int((end_date - start_date).days) + 1):
cycle_date = start_date + datetime.timedelta(cycle)
cycle_file_location = get_view_path(view, cycle_date)
reader = json_blob_reader(project, bucket, cycle_file_location)
for record in reader:
if condition(record):
if columns != ['*']:
record = select_dictionary_values(record, columns)
yield record
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment