Created
February 13, 2019 04:56
-
-
Save ZaxR/a47c2ae9c0f8addf261762ac1c2665fa to your computer and use it in GitHub Desktop.
Helpers for Google Cloud
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterable, Optional | |
from uuid import uuid4 | |
import pandas as pd | |
from google.cloud import bigquery | |
from google.cloud.exceptions import NotFound | |
def gcs_file_exists(project_name: str, bucket_name: str, file_name: str) -> bool: | |
"""Check to see whether a specific Google Cloud Storage file exists. | |
This function simply checks whether a specific Google Cloud Storage exists | |
and returns a Boolean value. If the bucket doesn't exist, False will be returned. | |
Args: | |
project_name: Name of the Google Cloud Project | |
bucket_name: Name of the bucket in which the file resides | |
file_name: Name of the file within the bucket. | |
Returns: | |
Bool reflecting whether or not the file exists in gcs. | |
Examples: | |
>>> gcs_file_exists(project_name='spins-retail-solutions', | |
>>> bucket_name='product-intelligence', | |
>>> file_name='data/spins_upcs/spins_upcs_000000000000.csv') | |
""" | |
storage_client = storage.Client(project=project_name) | |
try: | |
storage_bucket = storage_client.get_bucket(bucket_name) | |
except NotFound: | |
return False | |
return storage_bucket.blob(file_name).exists() | |
def bq_table_exists(project_id: str, dataset_id: str, table_id: str) -> bool: | |
"""Checks if a given BigQuery table exists. | |
Args: | |
project_id: Google Project name. | |
dataset_id: Google Dataset name. | |
table_id: Google Table name. | |
Returns: | |
Bool reflecting whether or not the table exists in BigQuery. | |
""" | |
client = bigquery.Client() | |
table_ref = client.dataset(dataset_id, project=project_id).table(table_id) | |
try: | |
client.get_table(table_ref) | |
return True | |
except NotFound: | |
return False | |
def bq_query_to_iterator(query: str, dialect: str="legacy", use_query_cache: bool=True): | |
"""Runs a given query, returning the results as a pd.DataFrame. | |
Args: | |
query: A valid BigQuery query in syntax matching the dialect arg. | |
dialect: BigQuery SQL dialect - either 'legacy' or 'standard' | |
use_query_cache: Specifies if a cached version of the query's results should be used, if available. | |
Returns: | |
A pd.DataFrame of the query result. | |
""" | |
client = bigquery.Client() | |
# Set up the query configuration. | |
job_config = bigquery.QueryJobConfig() | |
job_config.use_query_cache = use_query_cache | |
if dialect == "legacy": | |
job_config.use_legacy_sql = True | |
job_config.allow_large_results = True | |
return client.query(query, job_config=job_config) | |
def bq_query_to_df(query: str, dialect: str="legacy", use_query_cache: bool=True): | |
"""Wrapper around bq_query_to_iterator() to instead return results as a pd.DataFrame. | |
Args: | |
See bq_query_to_iterator(). | |
Returns: | |
A pd.DataFrame of the query result. | |
""" | |
iterator = bq_query_to_iterator(query=query, dialect=dialect, use_query_cache=use_query_cache) | |
return iterator.to_dataframe() | |
def bq_query_to_table(query: str, project_id: str, dataset_id: str, table_id: str, | |
dialect="legacy", write_disposition="WRITE_TRUNCATE", | |
use_query_cache: bool=True): | |
"""Runs a given query, saving the results as a table with the given name. | |
Args: | |
query: A valid BigQuery query in syntax matching the dialect arg. | |
project_id: Google Project name. | |
dataset_id: Google Dataset name. | |
table_id: Google Table name. | |
dialect: BigQuery SQL dialect - either 'legacy' or 'standard' | |
write_disposition: Specifies the behavior when writing query results to an existing table. | |
Options: | |
'WRITE_APPEND' : Rows may be appended to an existing table. | |
'WRITE_EMPTY' : Output table must be empty. | |
'WRITE_TRUNCATE': Specifies that write should replace a table. | |
use_query_cache: Specifies if a cached version of the query's results should be used, if available. | |
Returns: | |
An iterator of rows of the query result. | |
""" | |
client = bigquery.Client() | |
# Set up the query configuration. | |
job_config = bigquery.QueryJobConfig() | |
job_config.use_query_cache = use_query_cache | |
job_config.destination = client.dataset(project=project_id, dataset_id=dataset_id).table(table_id) | |
job_config.write_disposition = write_disposition | |
if dialect == "legacy": | |
job_config.use_legacy_sql = True | |
job_config.allow_large_results = True | |
query_job = client.query(query, job_config=job_config) | |
# LOG.info("Running query...") | |
iterator = query_job.result() # creates the table and iterator of results | |
msg = (f"[{project_id}:{dataset_id}.{table_id}]" | |
if dialect == "legacy" | |
else f"`{project_id}.{dataset_id}.{table_id}`") | |
# LOG.info(f"Query results written to {msg}") | |
return iterator | |
def bq_query_to_table_and_df(query: str, project_id: str, dataset_id: str, table_id: str, | |
dialect="legacy", write_disposition="WRITE_TRUNCATE", | |
use_query_cache: bool=True): | |
"""Wrapper around bq_query_to_table() to also return results as a pd.DataFrame. | |
Args: | |
See bq_query_to_table(). | |
Returns: | |
A pd.DataFrame of the query result. | |
""" | |
iterator = bq_query_to_table(query=query, project_id=project_id, dataset_id=dataset_id, table_id=table_id, | |
dialect=dialect, write_disposition=write_disposition, use_query_cache=use_query_cache) | |
return iterator.to_dataframe() | |
def bq_table_to_gcs(gs_path: str, project_id: str, dataset_id: str, table_id: str, | |
location: str='US'): | |
"""Saves the entire given table as a csv at the given gs_path. | |
Args: | |
gs_path: Path of the form "gs://<bucket>/<path>.<ext>". | |
project_id: Google Project name. | |
dataset_id: Google Dataset name. | |
table_id: Google Table name. | |
location: The geographic location of the table's hosting. | |
""" | |
client = bigquery.Client() | |
table_ref = client.dataset(dataset_id, project=project_id).table(table_id) | |
extract_job = client.extract_table(table_ref, gs_path, location=location) | |
result = extract_job.result() # saves the table as a csv, but does not return data | |
# LOG.info(f"Exported `{project_id}.{dataset_id}.{table_id}` to {gs_path}") | |
return result | |
def bq_query_to_gcs(query: str, gs_path: str, project_id: str, dataset_id: str, location: str='US', | |
dialect="legacy", write_disposition="WRITE_EMPTY", use_query_cache: bool=True): | |
"""Runs a Bigquery query, saves it to a table, and exports a csv to gcs. | |
Notes: | |
Wrapper for bq_query_to_table() and bq_table_to_gcs(). As a result, this function | |
creates an interim table, which is then cleaned up. To keep the interim table, run | |
the underlying bq_query_to_table() and bq_table_to_gcs() separately. | |
write_disposition default is set to "WRITE_EMPTY" incase there is accidental name collision. | |
""" | |
# Assign a random name for our temporary table | |
table_id = str(uuid4()).replace("-", "_") | |
bq_query_to_table(query=query, project_id=project_id, dataset_id=dataset_id, | |
table_id=table_id, dialect=dialect, write_disposition=write_disposition, | |
use_query_cache=use_query_cache) | |
bq_table_to_gcs(gs_path=gs_path, project_id=project_id, dataset_id=dataset_id, table_id=table_id, | |
location=location) | |
# Delete table created by bq_query_to_table() | |
client = bigquery.Client() | |
table_ref = client.dataset(dataset_id).table(table_id) | |
client.delete_table(table_ref) | |
# LOG.info(f"Saved query results to {gs_path}") | |
def pull_table_uniq_col_vals(table_id: str, dataset_id: str, project_id: str, dialect: str = 'legacy', | |
ignore_cols: Iterable[str], limit: Optional[int]=None, counts: bool=True): | |
"""Get all unique values for each column in a BigQuery table. | |
Args: | |
table_id: BigQuery table name. | |
dataset_id: BigQuery dataset name. | |
project_id: BigQuery project name. Default is 'shining-landing-763'. | |
dialect: BigQuery SQL dialect. Default is 'legacy'; other option is 'standard'. | |
ignore_cols: Partial or full column names to ignore. | |
limit: Limit on the number of results to return. | |
counts: Counts of each unique value per column. Default is True. | |
Note: Will be added as a comma separation after the value. | |
Returns: | |
pd.DataFrame of all unique values per table column, optionally with counts. | |
""" | |
client = bigquery.Client() | |
table = client.dataset(dataset_id=dataset_id, project=project_id).table(table_id) | |
schema = client.get_table(table).schema | |
col_names = [col.name for col in schema] | |
if dialect == 'legacy': | |
table_path = f"[{project_id}:{dataset_id}.{table_id}]" | |
elif dialect == 'standard': | |
table_path = f"`{project_id}.{dataset_id}.{table_id}`" | |
else: | |
raise ValueError("Please select a valid dialect - either 'legacy' or 'standard'.") | |
to_concat = [] | |
limit = f"LIMIT {limit}" if limit is not None else "" | |
for col in col_names: | |
if not any(word in col.lower() for word in ignore_cols): | |
select = f'CAST({col} AS STRING) + ", " + CAST(COUNT({col}) AS STRING)' if counts else col | |
query = f""" | |
SELECT | |
temp.f0_ AS {col} | |
FROM (SELECT | |
{select} | |
FROM | |
{table_path} | |
GROUP BY | |
{col} | |
{limit}) temp | |
""" | |
to_concat.append(pd.read_gbq(query, project_id=project_id, dialect=dialect)) | |
return pd.concat(to_concat, axis=1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment