Last active
August 21, 2023 07:07
-
-
Save Dminor7/8ff3fa034e991ac2839d66131fc004c5 to your computer and use it in GitHub Desktop.
Functions for moving data between DataFrame and Google Cloud Storage/BigQuery using Arrow and GCS libraries.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Check this stackeroverflow question: https://stackoverflow.com/questions/68303327/unnecessary-list-item-nesting-in-bigquery-schemas-from-pyarrow-upload-dataframe | |
Check this github issue: https://github.com/googleapis/python-bigquery/issues/19 | |
""" | |
from google.cloud.bigquery._pandas_helpers import * | |
from google.cloud.bigquery import _helpers | |
from google.cloud import storage | |
from google.cloud import bigquery | |
def upload_to_gcs(df, bq_schema, bucket, key, service_account_json): | |
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) | |
import pyarrow.parquet | |
import pyarrow | |
bq_schema = schema._to_schema_fields(bq_schema) | |
arrow_table = dataframe_to_arrow(df, bq_schema) | |
writer = pyarrow.BufferOutputStream() | |
pyarrow.parquet.write_table(arrow_table, writer,use_compliant_nested_type=True) | |
body = bytes(writer.getvalue()) | |
client = storage.Client.from_service_account_json(service_account_json) | |
bucket = storage.Bucket(client, bucket) | |
blob = bucket.blob(key) | |
blob.upload_from_string(body) | |
def gcs_to_bq(table_id, bq_schema, uri, service_account_json, write_disposition): | |
client = bigquery.Client.from_service_account_json(service_account_json) | |
table = bigquery.Table(table_id, schema=bq_schema) | |
client.create_table(table, exists_ok=True) | |
job_config = bigquery.LoadJobConfig() | |
job_config.source_format = bigquery.SourceFormat.PARQUET | |
job_config.write_disposition = write_disposition | |
parquet_options = bigquery.format_options.ParquetOptions() | |
parquet_options.enable_list_inference = True | |
job_config.parquet_options = parquet_options | |
load_job = client.load_table_from_uri( | |
uri, table_id, job_config=job_config | |
) | |
load_job.result() | |
def gcs_to_bq_external_table(table_id, bq_schema, uri, service_account_info): | |
client = bigquery.Client.from_service_account_info(service_account_info) | |
table = bigquery.Table(table_id, schema=bq_schema) | |
parquet_options = bigquery.format_options.ParquetOptions() | |
parquet_options.enable_list_inference = True | |
external_config = bigquery.ExternalConfig("PARQUET") | |
external_config.source_uris = [uri] | |
external_config.parquet_options=parquet_options | |
table.external_data_configuration = external_config | |
table = client.create_table(table,exists_ok=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment