Created
March 14, 2019 21:09
-
-
Save vinoaj/293c9c8825229bb102eac32c817d175b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import logging | |
from google.cloud import bigquery | |
# Insert name of Google Cloud Storage bucket | |
BUCKET_NAME = '<your_bucket_name>' | |
# Insert Google Cloud project ID | |
PROJECT = '<your_project_id>' | |
# Insert required file format, either 'NEWLINE_DELIMITED_JSON', 'CSV' or 'AVRO' | |
FILE_FORMAT = 'NEWLINE_DELIMITED_JSON' | |
# Insert True or False depending on if you want it to be zipped | |
ZIPPED = True | |
# For JSON use GZIP, for AVRO use SNAPPY or DEFLATE. If none use None | |
COMPRESSION_FORMAT = 'GZIP' | |
def extract_bigquery_table(event, context): | |
pubsub_message = base64.b64decode(event['data']).decode('utf-8') | |
obj = json.loads(pubsub_message) | |
bq_client = bigquery.Client() | |
dataset_id = obj['protoPayload']['serviceData']['jobCompletedEvent']['job']['jobConfiguration']['load']['destinationTable']['datasetId'] | |
table_id = obj['protoPayload']['serviceData']['jobCompletedEvent']['job']['jobConfiguration']['load']['destinationTable']['tableId'] | |
file_name = dataset_id + '/' + dataset_id + '_' + table_id | |
if FILE_FORMAT == "NEWLINE_DELIMITED_JSON": | |
file_name = file_name + '.json' | |
elif FILE_FORMAT == "CSV": | |
file_name = file_name + '.csv' | |
elif FILE_FORMAT == "AVRO": | |
file_name = file_name + '.avro' | |
else: | |
return False | |
if ZIPPED and FILE_FORMAT in ["NEWLINE_DELIMITED_JSON","CSV"]: | |
file_name = file_name + '.gz' | |
destination_uri = 'gs://{}/{}'.format(BUCKET_NAME, file_name) | |
dataset_ref = bq_client.dataset(dataset_id, project=PROJECT) | |
table_ref = dataset_ref.table(table_id) | |
job_config = bigquery.ExtractJobConfig() | |
job_config.compression = COMPRESSION_FORMAT | |
job_config.destination_format = FILE_FORMAT | |
extract_job = bq_client.extract_table( | |
table_ref, | |
destination_uri, | |
job_config=job_config) | |
extract_job.result() | |
print('Exported {}:{}.{} to {}'.format( | |
PROJECT, dataset_id, table_id, destination_uri)) | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment