Created
January 20, 2021 14:32
-
-
Save jallum/ec00380c5148e5027c0d61838a1758f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import json | |
import os | |
from google.api_core import retry | |
from google.cloud import bigquery | |
from google.cloud import storage | |
PROJECT_ID = os.getenv('GCP_PROJECT') | |
BQ_DATASET = 'kiosk_analytics_reporting' | |
BQ_TABLE = 'analytics_events' | |
CS = storage.Client() | |
BQ = bigquery.Client() | |
def load_analytics_data(event, context): | |
"""Triggered by a change to a Cloud Storage bucket. | |
Args: | |
event (dict): Event payload. | |
context (google.cloud.functions.Context): Metadata for the event. | |
""" | |
file = event | |
print(f"Processing file: {file['name']}.") | |
bucket_name = event['bucket'] | |
file_name = event['name'] | |
print("bucket_name={},file_name={}".format(bucket_name, file_name)) | |
print("Projectd_ID ={},BQ_DATASET={},BQ_TABLE={}".format(PROJECT_ID,BQ_DATASET, BQ_TABLE)) | |
_insert_into_bigquery(bucket_name, file_name) | |
def _insert_into_bigquery(bucket_name, file_name): | |
print("insert_into_biquery") | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
json_data = blob.download_as_string() | |
json_data_rows = [json.loads(row.decode('utf-8')) for row in json_data.split(b'\n') if row] | |
table = BQ.dataset(BQ_DATASET).table(BQ_TABLE) | |
df = pd.DataFrame(json_data_rows) | |
#print(df.head(10)) | |
#errors = BQ.insert_rows_from_dataframe(table,dataframe=df) | |
for row in json_data_rows: | |
#print(row) | |
errors = BQ.insert_rows_json(table, | |
json_rows=[row], | |
retry=retry.Retry(deadline=30)) | |
if errors != []: | |
raise BigQueryError(errors) | |
class BigQueryError(Exception): | |
'''Exception raised whenever a BigQuery error happened''' | |
def __init__(self, errors): | |
super().__init__(self._format(errors)) | |
self.errors = errors | |
def _format(self, errors): | |
err = [] | |
for error in errors: | |
err.extend(error['errors']) | |
return json.dumps(err) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment