Last active
May 8, 2024 10:54
-
-
Save krisjan-oldekamp/78869851421af2f75325c32302fa2137 to your computer and use it in GitHub Desktop.
Backup BigQuery Views and Scheduled Queries to a Git repository using Python. Full article on stacktonic.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################################ | |
# Author Krisjan Oldekamp / Stacktonic.com | |
# Email [email protected] | |
# Article https://stacktonic.com/article/backup-your-valuable-big-query-views-and-scheduled-queries-using-python | |
############################################################ | |
import os | |
import git | |
import google.oauth2.service_account | |
from google.cloud import bigquery | |
from google.cloud import bigquery_datatransfer # Also enable the Data Transfer API in the GCP console | |
############################################################ | |
# Settings | |
LOCAL_PATH_BACKUP = "./gcp-bigquery-sql-backup/" # Backup directory (end with a trailing /). When REPO_COMMIT is set to True, this folder will also be used to clone and commit to the Git repository | |
REPO_COMMIT = True # When set to True, the script will clone the Git repository specified in the REPO_LINK setting. Then it will delete all the folders in the repo, download the latest version of the views and scheduled queries to the repo and commits the changes. | |
REPO_LINK = "[email protected]..." # Repository link | |
REPO_KEY_PATH = "../.ssh/..." # Path to SSH private key used for authentication with the repository. Leave empty when not using an SSH key. | |
GCP_JSON_KEYPATH = "your-keyfile.json" # Path to JSON keyfile for Service Account | |
# You can specify multiple projects. The location / region is required for downloading scheduled query SQL. | |
GCP_PROJECTS_BACKUP = [ | |
{ | |
"project_id": "your-gcp-project-id", | |
"location": "europe" # See the "region" column in the GCP scheduled query interface | |
} | |
] | |
# End of settings | |
############################################################ | |
# Authenticate use a Service account / JSON keyfile | |
def get_credentials(keyfile_json): | |
scopes=['https://www.googleapis.com/auth/bigquery'] | |
return ( | |
google.oauth2.service_account.Credentials.from_service_account_file(keyfile_json, scopes=scopes) | |
) | |
# Format scheduled query name so it can be used in a filename | |
def format_name(name): | |
return name.lower().replace(" ", "_") | |
# Push commit to repository | |
def git_push_commit(repo, repo_key_path, repo_local_path): | |
try: | |
if repo_key_path == "": | |
repo.git.add(all=True) | |
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.") | |
repo.remotes.origin.push() | |
else: | |
with git.Git().custom_environment(GIT_SSH_COMMAND="ssh -i " + repo_key_path): | |
repo.git.add(all=True) | |
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.") | |
repo.remotes.origin.push() | |
except: | |
print("Some error occured while pushing the commit") | |
# Save Google BigQuery views to local filesystem | |
def save_bigquery_views(credentials, project_id, path): | |
client_bq = bigquery.Client(credentials=credentials, project=project_id) | |
datasets = list(client_bq.list_datasets()) | |
cnt_views = 0 | |
# Loop datasets | |
if datasets: | |
for dataset in datasets: | |
dataset_name = dataset.dataset_id | |
dataset_ref = dataset.reference | |
tables = list(client_bq.list_tables(dataset_ref)) | |
# Loop tables in dataset | |
for table in tables: | |
# Only select VIEW | |
if table.table_type == "VIEW": | |
table_name = table.table_id | |
table_ref = dataset_ref.table(table_name) | |
table = client_bq.get_table(table_ref) | |
backup_directory = path + project_id + "/bq_views/" + dataset_name | |
if not os.path.exists(backup_directory): | |
os.makedirs(backup_directory) # Create directory when it doesn't exist | |
# Save view SQL to file | |
f = open(backup_directory + "/" + table_name + ".sql", "w+") # Save view SQL to file | |
f.write(table.view_query) | |
cnt_views+=1 | |
return cnt_views | |
# Save Google BigQuery scheduled queries to local filesystem | |
def save_bigquery_scheduled_queries(credentials, project_id, location, path): | |
client_bq_df = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials) | |
parent = "projects/{}/locations/{}".format(project_id, location) | |
cnt_scheduled_queries = 0 | |
# Loop all Data Transfer elements in project | |
for element in client_bq_df.list_transfer_configs(parent): | |
scheduled_query_name = format_name(element.display_name) | |
scheduled_query_sql = "" | |
# Loop over Data Transfer parameters, select scheduled queries and get query SQL | |
params = element.params.items() | |
for key, value in params: | |
if key == "query": | |
scheduled_query_sql = value | |
backup_directory = path + project_id + "/bq_scheduled_queries/" | |
if not os.path.exists(backup_directory): # Create directory when it doesn't exist | |
os.makedirs(backup_directory) | |
# Write to file | |
f = open(backup_directory + "/" + scheduled_query_name + ".sql", "w+") | |
f.write(scheduled_query_sql) | |
cnt_scheduled_queries+=1 | |
return cnt_scheduled_queries | |
def execute(): | |
credentials = get_credentials(GCP_JSON_KEYPATH) | |
print ("Found {} GCP projects in settings".format(len(GCP_PROJECTS_BACKUP))) | |
if REPO_COMMIT: | |
# Clone repository if Git commits are enabled | |
print("Git commits enabled. Cloning repository {} to {}".format(REPO_LINK, LOCAL_PATH_BACKUP)) | |
# Delete folder / repository when existing | |
if os.path.exists(LOCAL_PATH_BACKUP): | |
git.rmtree(LOCAL_PATH_BACKUP) | |
if REPO_KEY_PATH == "": | |
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP) | |
else: | |
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP, env={"GIT_SSH_COMMAND": "ssh -i " + REPO_KEY_PATH}) # Use SSH key | |
# Remove old backups in repository | |
repo_dirs = next(os.walk(LOCAL_PATH_BACKUP))[1] | |
for dir in repo_dirs: | |
if not dir.startswith("."): | |
git.rmtree(os.path.join(LOCAL_PATH_BACKUP, dir)) | |
else: | |
# Only download to local filesystem. Create directory when it doesn't exist | |
if not os.path.exists(LOCAL_PATH_BACKUP): | |
os.makedirs(LOCAL_PATH_BACKUP) | |
# Loop through GCP project and save views and scheduled queries | |
for i, project in enumerate(GCP_PROJECTS_BACKUP): | |
print ("-- Starting backup for project: {}".format(project["project_id"])) | |
views = save_bigquery_views( | |
credentials, | |
project["project_id"], | |
LOCAL_PATH_BACKUP | |
) | |
print("# {} views saved..".format(views)) | |
scheduled_queries = save_bigquery_scheduled_queries( | |
credentials, project["project_id"], | |
project["location"], | |
LOCAL_PATH_BACKUP | |
) | |
print("# {} scheduled queries saved..".format(scheduled_queries)) | |
# Push code to remote repository | |
if REPO_COMMIT: | |
git_push_commit(repo, REPO_KEY_PATH, LOCAL_PATH_BACKUP) | |
print("Pushed code to repository..") | |
print("Done.") | |
execute() |
@normwarren I came across the same error message. I found the solution in stackoverflow: https://stackoverflow.com/questions/67671525/bigquery-datatransfer-invalid-constructor-input-for-startmanualtransferrunsreq.
It seems that client_bq_df.list_transfer_configs(parent)
is expecting infromation about the project and location in the format of a dictionary instead of a string.
Following up on @miguel-graindata's comment: replace line 99 with:
parent = dict(parent= "projects/{}/locations/{}".format(project_id, location))
...and it works
(edited to write a better fix than I had)
Have gone a step further - my forked gist for this backs up stored procedures as well, if that's useful to anyone!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi. I get this error -- TypeError: Invalid constructor input for ListTransferConfigsRequest: 'projects/myproject/locations/us-central1'.
I know that I am entering the correct inputs for project/location. I can transfer Views, but get the error for scheduled queries.