Created
April 3, 2020 18:41
-
-
Save dharamsk/5cf23c08a26f7db2a5b03292e86e006a to your computer and use it in GitHub Desktop.
This python snippet was written to modify all schemas in a dataset to "relax" all columns that were REQUIRED to be NULLABLE. In this case, I applied it only to table that were modified in the last 24 hours, however this could be modified to do other useful operations on all tables in a dataset.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# for all tables modified in the last 24 hours | |
# relax all columns to be NULLABLE instead of REQUIRED | |
# Python3 | |
from google.cloud import bigquery | |
from datetime import datetime, timedelta | |
CLIENT = bigquery.Client() # auth using default credentials/project | |
DATASET = 'your_dataset' | |
PROJECT = 'your_project' | |
CUTOFF = datetime.utcnow() - timedelta(days=1) | |
def get_schema_json_from_table(table): | |
schema = table.schema | |
schema_json = list() | |
for col in schema: | |
schema_json.append(col.to_api_repr()) | |
return schema_json | |
def recursively_relax_fields(schema_json): | |
for col in schema_json: | |
if col['mode'] == 'REQUIRED': | |
col['mode'] = 'NULLABLE' | |
if col['type'] == 'RECORD': | |
recursively_relax_fields(col['fields']) | |
def create_schema_object_from_json_dict(schema_json): | |
# schema: a valid bigquery schema as json | |
schema = [] | |
for i in schema_json: | |
schema.append(bigquery.schema.SchemaField.from_api_repr(i)) | |
return schema | |
def relax_schema(table): | |
# table: a table object from client.get_table() | |
schema_json = get_schema_json_from_table(table) | |
recursively_relax_fields(schema_json) | |
new_schema = create_schema_object_from_json_dict(schema_json) | |
# overwrite the existing schema property of table | |
table.schema = new_schema | |
# push the changes up | |
CLIENT.update_table(table, ['schema']) | |
tables = list(CLIENT.list_tables(f'{PROJECT}.{DATASET}')) | |
modified_tables = list() | |
for t_id in tables: | |
t = CLIENT.get_table(t_id) | |
if t.modified.replace(tzinfo=None) > CUTOFF: | |
print(f'Telling {t.table_id} to chill the F out..') | |
relax_schema(t) | |
modified_tables.append(t.table_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment