Last active
October 21, 2021 20:13
-
-
Save tomfa/9492ac855ad0095a5e595822d5aada75 to your computer and use it in GitHub Desktop.
Demo on how to upload to BigQuery with python3. See https://notes.webutvikling.org/starting-bigquery/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 1. install dependencies | |
# 2. Set service account json in code | |
# or with env var "SA_ACCOUNT" | |
# 3. Run this file to import test data: | |
# "python3 bigquery_import.py" | |
import os | |
import json | |
from google.cloud import bigquery | |
from google.oauth2.service_account import Credentials | |
import pandas | |
TABLE_NAME = 'nameexample' | |
DATASET_NAME = 'example' | |
def get_service_account(): | |
service_account_json = os.environ.get('SA_CREDENTIALS') | |
# For local testing, you can comment out the lines below, and use your | |
# service account json file downloaded from google cloud | |
# | |
# service_account_json = json.loads({ | |
# "type": "service_account", | |
# "project_id": "myexampleproject", | |
# "private_key_id": "975c1e87f065c76c9915887acc1afedeb59600b0", | |
# "private_key": "-----BEGIN PRIVATE KEY-----\ndummyexample/12345\==\n-----END PRIVATE KEY-----\n", | |
# "client_email": "[email protected]", | |
# "client_id": "123457110251480388426", | |
# "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
# "token_uri": "https://oauth2.googleapis.com/token", | |
# "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
# "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-r7tvo%40myexampleproject.iam.gserviceaccount.com" | |
# }) | |
if not service_account_json: | |
raise Exception('Missing env var SA_CREDENTIALS') | |
try: | |
service_account = json.loads(service_account_json) | |
except Exception: | |
print('Could not load SA_CREDENTIALS as json!') | |
raise | |
pk = service_account.get('private_key') | |
service_account['private_key'] = pk.replace('\\n', '\n') | |
try: | |
# This validates data | |
Credentials.from_service_account_info( | |
service_account | |
) | |
return service_account | |
except Exception: | |
print('JSON credentials are wrong or malformatted!') | |
raise | |
def get_project_id(): | |
sa = get_service_account() | |
return sa['project_id'] | |
def get_client(): | |
service_account = get_service_account() | |
google_credentials = Credentials.from_service_account_info( | |
service_account | |
) | |
client = bigquery.Client(credentials=google_credentials) | |
return client | |
def _create_dataset(*, project_id: str, dataset_name: str): | |
client = get_client() | |
dataset_id = f'{project_id}.{dataset_name}' | |
dataset = bigquery.Dataset(dataset_id) | |
dataset.location = "europe-north1" | |
dataset = client.create_dataset( | |
dataset, | |
timeout=30 | |
) | |
print(f'Created dataset {dataset.project}.{dataset.dataset_id}') | |
return dataset | |
def _create_table( | |
*, | |
project_id: str, | |
dataset_name: str, | |
table_name: str, | |
schema | |
): | |
client = get_client() | |
table = bigquery.Table( | |
f"{project_id}.{dataset_name}.{table_name}", | |
schema=schema, | |
) | |
table = client.create_table(table) | |
print(f'Created table {table.project}.{table.dataset_id}.{table.table_id}') | |
return table | |
def _delete_dataset(*, project_id: str, dataset_name: str): | |
client = get_client() | |
dataset_id = f'{project_id}.{dataset_name}' | |
dataset = bigquery.Dataset(dataset_id) | |
dataset.location = "europe-north1" | |
client.delete_dataset( | |
dataset, | |
timeout=30 | |
) | |
print(f'Deleted dataset {dataset_id}') | |
return dataset | |
def _delete_table( | |
*, | |
project_id: str, | |
dataset_name: str, | |
table_name: str, | |
): | |
client = get_client() | |
table_id = f"{project_id}.{dataset_name}.{table_name}" | |
table = client.delete_table(table_id) | |
print(f'Deleted table {table_id}') | |
return table | |
def do_initial_setup(): | |
project_id = get_project_id() | |
_create_dataset( | |
project_id=project_id, | |
dataset_name=DATASET_NAME, | |
) | |
_create_table( | |
project_id=project_id, | |
dataset_name=DATASET_NAME, | |
table_name=TABLE_NAME, | |
schema=bq_schema | |
) | |
def send_to_bq(report_records): | |
client = get_client() | |
project_id = get_project_id() | |
dataframe = pandas.DataFrame( | |
report_records, | |
# In the loaded table, the column order reflects the order of the | |
# columns in the DataFrame. | |
columns=bq_columns, | |
) | |
job_config = bigquery.LoadJobConfig( | |
# Specify a (partial) schema. All columns are always written to the | |
# table. The schema is used to assist in data type definitions. | |
schema=bq_schema, | |
# Optionally, set the write disposition. BigQuery appends loaded rows | |
# to an existing table by default, but with WRITE_TRUNCATE writeF | |
# disposition it replaces the table with the loaded data. | |
write_disposition="WRITE_TRUNCATE", | |
) | |
job = client.load_table_from_dataframe( | |
dataframe, | |
project=project_id, | |
destination=f'{DATASET_NAME}.{TABLE_NAME}', | |
job_config=job_config | |
) | |
job.result() # Wait for the job to complete. | |
print(f'Uploaded todays records to {DATASET_NAME}.{TABLE_NAME}') | |
bq_types = { | |
'first_name': bigquery.enums.SqlTypeNames.STRING, | |
'last_name': bigquery.enums.SqlTypeNames.STRING, | |
} | |
bq_columns = list(bq_types.keys()) | |
bq_schema = list(map( | |
lambda item: bigquery.SchemaField(item[0], item[1]), | |
bq_types.items() | |
)) | |
if __name__ == "__main__": | |
print(f'Creating {DATASET_NAME}.{TABLE_NAME}') | |
try: | |
get_service_account() | |
except Exception: | |
print(f'Could not get service account. Is service account json set?') | |
raise | |
try: | |
do_initial_setup() | |
except Exception: | |
print('Could not add table. Assuming it has already been created. Continuing.') | |
data = [ | |
{ | |
'first_name': 'Tomas', | |
'last_name': 'Albertsen' | |
} | |
] | |
send_to_bq(data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import these dependencies with | |
# "pip install -r requirements.txt" | |
pandas==1.3.2 | |
requests==2.26.0 | |
google-auth==2.0.1 | |
google-cloud-bigquery==2.28.1 | |
pyarrow==5.0.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment