Created
April 27, 2023 08:03
-
-
Save bindiego/f33fc67d33671d28df411eae18276e67 to your computer and use it in GitHub Desktop.
Test Bigquery table existence then implement business logic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from datetime import date | |
from datetime import datetime | |
import time | |
import uuid | |
t = time.time() | |
t_ms = int(t * 1000) | |
print(f"The current time in milliseconds: {t_ms}") | |
from google.cloud import bigquery | |
from google.oauth2 import service_account | |
from google.cloud.exceptions import NotFound | |
# function to test table existence | |
def test_table_existence(table_id): | |
try: | |
client.get_table(table_id) | |
print("Table {} already exists.".format(table_id)) | |
return True | |
except NotFound: | |
print("Table {} is not found.".format(table_id)) | |
return False | |
project_name = "du-hast-mich" | |
dataset_name = "tests" | |
t_target = project_name + "." + dataset_name + "." + "test" | |
# event_table_prefix = "events_" | |
# table_id_prefix = project_name + "." + dataset_name + "." + event_table_prefix | |
# build Bigquery client | |
sa_file = "/path/to/your/service-account-file.json" | |
credentials = service_account.Credentials.from_service_account_file( | |
sa_file, scopes=["https://www.googleapis.com/auth/cloud-platform"], | |
) | |
client = bigquery.Client(credentials=credentials) | |
# table exists | |
t_0401 = project_name + "." + dataset_name + "." + "test0401" | |
# table doesn't exist | |
today = date.today() | |
t_today = project_name + "." + dataset_name + "." + "test" + today.strftime("%Y%m%d") | |
# test an existing table | |
if test_table_existence(t_0401): | |
q = """ | |
INSERT INTO | |
`du-hast-mich.tests.test` | |
(name) | |
VALUES | |
(@name) | |
""" | |
job_config = bigquery.QueryJobConfig( | |
query_parameters=[ | |
bigquery.ScalarQueryParameter("name", "STRING", str(uuid.uuid4)), | |
] | |
) | |
query_job = client.query(q, job_config=job_config) | |
# test an non-existing table | |
for i in range(3): | |
if test_table_existence(t_today): | |
break | |
else: | |
print("sleep 3s") | |
time.sleep(3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment