Skip to content

Instantly share code, notes, and snippets.

@ZaxR
Created February 13, 2019 04:56
Show Gist options
  • Save ZaxR/a47c2ae9c0f8addf261762ac1c2665fa to your computer and use it in GitHub Desktop.
Save ZaxR/a47c2ae9c0f8addf261762ac1c2665fa to your computer and use it in GitHub Desktop.
Helpers for Google Cloud
from typing import Iterable, Optional
from uuid import uuid4
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
def gcs_file_exists(project_name: str, bucket_name: str, file_name: str) -> bool:
"""Check to see whether a specific Google Cloud Storage file exists.
This function simply checks whether a specific Google Cloud Storage exists
and returns a Boolean value. If the bucket doesn't exist, False will be returned.
Args:
project_name: Name of the Google Cloud Project
bucket_name: Name of the bucket in which the file resides
file_name: Name of the file within the bucket.
Returns:
Bool reflecting whether or not the file exists in gcs.
Examples:
>>> gcs_file_exists(project_name='spins-retail-solutions',
>>> bucket_name='product-intelligence',
>>> file_name='data/spins_upcs/spins_upcs_000000000000.csv')
"""
storage_client = storage.Client(project=project_name)
try:
storage_bucket = storage_client.get_bucket(bucket_name)
except NotFound:
return False
return storage_bucket.blob(file_name).exists()
def bq_table_exists(project_id: str, dataset_id: str, table_id: str) -> bool:
"""Checks if a given BigQuery table exists.
Args:
project_id: Google Project name.
dataset_id: Google Dataset name.
table_id: Google Table name.
Returns:
Bool reflecting whether or not the table exists in BigQuery.
"""
client = bigquery.Client()
table_ref = client.dataset(dataset_id, project=project_id).table(table_id)
try:
client.get_table(table_ref)
return True
except NotFound:
return False
def bq_query_to_iterator(query: str, dialect: str="legacy", use_query_cache: bool=True):
"""Runs a given query, returning the results as a pd.DataFrame.
Args:
query: A valid BigQuery query in syntax matching the dialect arg.
dialect: BigQuery SQL dialect - either 'legacy' or 'standard'
use_query_cache: Specifies if a cached version of the query's results should be used, if available.
Returns:
A pd.DataFrame of the query result.
"""
client = bigquery.Client()
# Set up the query configuration.
job_config = bigquery.QueryJobConfig()
job_config.use_query_cache = use_query_cache
if dialect == "legacy":
job_config.use_legacy_sql = True
job_config.allow_large_results = True
return client.query(query, job_config=job_config)
def bq_query_to_df(query: str, dialect: str="legacy", use_query_cache: bool=True):
"""Wrapper around bq_query_to_iterator() to instead return results as a pd.DataFrame.
Args:
See bq_query_to_iterator().
Returns:
A pd.DataFrame of the query result.
"""
iterator = bq_query_to_iterator(query=query, dialect=dialect, use_query_cache=use_query_cache)
return iterator.to_dataframe()
def bq_query_to_table(query: str, project_id: str, dataset_id: str, table_id: str,
dialect="legacy", write_disposition="WRITE_TRUNCATE",
use_query_cache: bool=True):
"""Runs a given query, saving the results as a table with the given name.
Args:
query: A valid BigQuery query in syntax matching the dialect arg.
project_id: Google Project name.
dataset_id: Google Dataset name.
table_id: Google Table name.
dialect: BigQuery SQL dialect - either 'legacy' or 'standard'
write_disposition: Specifies the behavior when writing query results to an existing table.
Options:
'WRITE_APPEND' : Rows may be appended to an existing table.
'WRITE_EMPTY' : Output table must be empty.
'WRITE_TRUNCATE': Specifies that write should replace a table.
use_query_cache: Specifies if a cached version of the query's results should be used, if available.
Returns:
An iterator of rows of the query result.
"""
client = bigquery.Client()
# Set up the query configuration.
job_config = bigquery.QueryJobConfig()
job_config.use_query_cache = use_query_cache
job_config.destination = client.dataset(project=project_id, dataset_id=dataset_id).table(table_id)
job_config.write_disposition = write_disposition
if dialect == "legacy":
job_config.use_legacy_sql = True
job_config.allow_large_results = True
query_job = client.query(query, job_config=job_config)
# LOG.info("Running query...")
iterator = query_job.result() # creates the table and iterator of results
msg = (f"[{project_id}:{dataset_id}.{table_id}]"
if dialect == "legacy"
else f"`{project_id}.{dataset_id}.{table_id}`")
# LOG.info(f"Query results written to {msg}")
return iterator
def bq_query_to_table_and_df(query: str, project_id: str, dataset_id: str, table_id: str,
dialect="legacy", write_disposition="WRITE_TRUNCATE",
use_query_cache: bool=True):
"""Wrapper around bq_query_to_table() to also return results as a pd.DataFrame.
Args:
See bq_query_to_table().
Returns:
A pd.DataFrame of the query result.
"""
iterator = bq_query_to_table(query=query, project_id=project_id, dataset_id=dataset_id, table_id=table_id,
dialect=dialect, write_disposition=write_disposition, use_query_cache=use_query_cache)
return iterator.to_dataframe()
def bq_table_to_gcs(gs_path: str, project_id: str, dataset_id: str, table_id: str,
location: str='US'):
"""Saves the entire given table as a csv at the given gs_path.
Args:
gs_path: Path of the form "gs://<bucket>/<path>.<ext>".
project_id: Google Project name.
dataset_id: Google Dataset name.
table_id: Google Table name.
location: The geographic location of the table's hosting.
"""
client = bigquery.Client()
table_ref = client.dataset(dataset_id, project=project_id).table(table_id)
extract_job = client.extract_table(table_ref, gs_path, location=location)
result = extract_job.result() # saves the table as a csv, but does not return data
# LOG.info(f"Exported `{project_id}.{dataset_id}.{table_id}` to {gs_path}")
return result
def bq_query_to_gcs(query: str, gs_path: str, project_id: str, dataset_id: str, location: str='US',
dialect="legacy", write_disposition="WRITE_EMPTY", use_query_cache: bool=True):
"""Runs a Bigquery query, saves it to a table, and exports a csv to gcs.
Notes:
Wrapper for bq_query_to_table() and bq_table_to_gcs(). As a result, this function
creates an interim table, which is then cleaned up. To keep the interim table, run
the underlying bq_query_to_table() and bq_table_to_gcs() separately.
write_disposition default is set to "WRITE_EMPTY" incase there is accidental name collision.
"""
# Assign a random name for our temporary table
table_id = str(uuid4()).replace("-", "_")
bq_query_to_table(query=query, project_id=project_id, dataset_id=dataset_id,
table_id=table_id, dialect=dialect, write_disposition=write_disposition,
use_query_cache=use_query_cache)
bq_table_to_gcs(gs_path=gs_path, project_id=project_id, dataset_id=dataset_id, table_id=table_id,
location=location)
# Delete table created by bq_query_to_table()
client = bigquery.Client()
table_ref = client.dataset(dataset_id).table(table_id)
client.delete_table(table_ref)
# LOG.info(f"Saved query results to {gs_path}")
def pull_table_uniq_col_vals(table_id: str, dataset_id: str, project_id: str, dialect: str = 'legacy',
ignore_cols: Iterable[str], limit: Optional[int]=None, counts: bool=True):
"""Get all unique values for each column in a BigQuery table.
Args:
table_id: BigQuery table name.
dataset_id: BigQuery dataset name.
project_id: BigQuery project name. Default is 'shining-landing-763'.
dialect: BigQuery SQL dialect. Default is 'legacy'; other option is 'standard'.
ignore_cols: Partial or full column names to ignore.
limit: Limit on the number of results to return.
counts: Counts of each unique value per column. Default is True.
Note: Will be added as a comma separation after the value.
Returns:
pd.DataFrame of all unique values per table column, optionally with counts.
"""
client = bigquery.Client()
table = client.dataset(dataset_id=dataset_id, project=project_id).table(table_id)
schema = client.get_table(table).schema
col_names = [col.name for col in schema]
if dialect == 'legacy':
table_path = f"[{project_id}:{dataset_id}.{table_id}]"
elif dialect == 'standard':
table_path = f"`{project_id}.{dataset_id}.{table_id}`"
else:
raise ValueError("Please select a valid dialect - either 'legacy' or 'standard'.")
to_concat = []
limit = f"LIMIT {limit}" if limit is not None else ""
for col in col_names:
if not any(word in col.lower() for word in ignore_cols):
select = f'CAST({col} AS STRING) + ", " + CAST(COUNT({col}) AS STRING)' if counts else col
query = f"""
SELECT
temp.f0_ AS {col}
FROM (SELECT
{select}
FROM
{table_path}
GROUP BY
{col}
{limit}) temp
"""
to_concat.append(pd.read_gbq(query, project_id=project_id, dialect=dialect))
return pd.concat(to_concat, axis=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment