Forked from krisjan-oldekamp/google_bigquery_backup_views_scheduled_queries_git.py
Last active
December 13, 2022 18:17
-
-
Save rich-ard/6e0421aaf7e42f8a557568b3d892059b to your computer and use it in GitHub Desktop.
Backup BigQuery Views and Scheduled Queries to a Git repository using Python. Full article on stacktonic.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################################ | |
# Author Krisjan Oldekamp / Stacktonic.com | |
# Email [email protected] | |
# Article https://stacktonic.com/article/backup-your-valuable-big-query-views-and-scheduled-queries-using-python | |
############################################################ | |
import os | |
import git | |
import google.oauth2.service_account | |
from google.cloud import bigquery | |
from google.cloud import bigquery_datatransfer # Also enable the Data Transfer API in the GCP console | |
############################################################ | |
# Settings | |
LOCAL_PATH_BACKUP = "./gcp-bigquery-sql-backup/" # Backup directory (end with a trailing /). When REPO_COMMIT is set to True, this folder will also be used to clone and commit to the Git repository | |
REPO_COMMIT = True # When set to True, the script will clone the Git repository specified in the REPO_LINK setting. Then it will delete all the folders in the repo, download the latest version of the views and scheduled queries to the repo and commits the changes. | |
REPO_LINK = "[email protected]..." # Repository link | |
REPO_KEY_PATH = "../.ssh/..." # Path to SSH private key used for authentication with the repository. Leave empty when not using an SSH key. | |
GCP_JSON_KEYPATH = "your-keyfile.json" # Path to JSON keyfile for Service Account | |
# You can specify multiple projects. The location / region is required for downloading scheduled query SQL. | |
GCP_PROJECTS_BACKUP = [ | |
{ | |
"project_id": "your-gcp-project-id", | |
"location": "europe" # See the "region" column in the GCP scheduled query interface | |
} | |
] | |
# End of settings | |
############################################################ | |
# Authenticate use a Service account / JSON keyfile | |
def get_credentials(keyfile_json): | |
scopes=['https://www.googleapis.com/auth/bigquery'] | |
return ( | |
google.oauth2.service_account.Credentials.from_service_account_file(keyfile_json, scopes=scopes) | |
) | |
# Format scheduled query name so it can be used in a filename | |
def format_name(name): | |
return name.lower().replace(" ", "_") | |
# Push commit to repository | |
def git_push_commit(repo, repo_key_path, repo_local_path): | |
try: | |
if repo_key_path == "": | |
repo.git.add(all=True) | |
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.") | |
repo.remotes.origin.push() | |
else: | |
with git.Git().custom_environment(GIT_SSH_COMMAND="ssh -i " + repo_key_path): | |
repo.git.add(all=True) | |
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.") | |
repo.remotes.origin.push() | |
except: | |
print("Some error occured while pushing the commit") | |
# Save Google BigQuery views to local filesystem | |
def save_bigquery_views(credentials, project_id, path): | |
client_bq = bigquery.Client(credentials=credentials, project=project_id) | |
datasets = list(client_bq.list_datasets()) | |
cnt_views = 0 | |
# Loop datasets | |
if datasets: | |
for dataset in datasets: | |
dataset_name = dataset.dataset_id | |
dataset_ref = dataset.reference | |
tables = list(client_bq.list_tables(dataset_ref)) | |
# Loop tables in dataset | |
for table in tables: | |
# Only select VIEW | |
if table.table_type == "VIEW": | |
table_name = table.table_id | |
table_ref = dataset_ref.table(table_name) | |
table = client_bq.get_table(table_ref) | |
backup_directory = path + project_id + "/bq_views/" + dataset_name | |
if not os.path.exists(backup_directory): | |
os.makedirs(backup_directory) # Create directory when it doesn't exist | |
# Save view SQL to file | |
f = open(backup_directory + "/" + table_name + ".sql", "w+") # Save view SQL to file | |
f.write(table.view_query) | |
cnt_views+=1 | |
return cnt_views | |
# Save Google BigQuery scheduled queries to local filesystem | |
def save_bigquery_scheduled_queries(credentials, project_id, location, path): | |
client_bq_df = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials) | |
parent = "projects/{}/locations/{}".format(project_id, location) | |
cnt_scheduled_queries = 0 | |
# Loop all Data Transfer elements in project | |
for element in client_bq_df.list_transfer_configs({"parent": parent}): | |
scheduled_query_name = format_name(element.display_name) | |
scheduled_query_sql = "" | |
# Loop over Data Transfer parameters, select scheduled queries and get query SQL | |
params = element.params.items() | |
for key, value in params: | |
if key == "query": | |
scheduled_query_sql = value | |
backup_directory = path + project_id + "/bq_scheduled_queries/" | |
if not os.path.exists(backup_directory): # Create directory when it doesn't exist | |
os.makedirs(backup_directory) | |
# Write to file | |
f = open(backup_directory + "/" + scheduled_query_name + ".sql", "w+") | |
f.write(scheduled_query_sql) | |
cnt_scheduled_queries+=1 | |
return cnt_scheduled_queries | |
# Save Google BigQuery stored procedures to local filesystem | |
def save_bigquery_stored_procedures(credentials, project_id, path): | |
client_bq = bigquery.Client(credentials=credentials, project=project_id) | |
datasets = list(client_bq.list_datasets()) | |
cnt_views = 0 | |
# Loop datasets | |
if datasets: | |
for dataset in datasets: | |
dataset_name = dataset.dataset_id | |
dataset_ref = dataset.reference | |
stored_procs = list(client_bq.list_routines(dataset_ref)) | |
# Loop stored procedures in dataset | |
for stored_proc in stored_procs: | |
stored_proc_routine = client_bq.get_routine(stored_proc)._properties['definitionBody'] | |
# Only select VIEW | |
''' | |
if table.table_type == "VIEW": | |
table_name = table.table_id | |
table_ref = dataset_ref.table(table_name) | |
table = client_bq.get_table(table_ref) | |
''' | |
backup_directory = path + project_id + "/bq_stored_procedures/" + dataset_name | |
if not os.path.exists(backup_directory): | |
os.makedirs(backup_directory) # Create directory when it doesn't exist | |
# Save view SQL to file | |
f = open(backup_directory + "/" + dataset_name + ".sql", "w+") # Save view SQL to file | |
f.write(stored_proc_routine) | |
cnt_views+=1 | |
return cnt_views | |
def execute(): | |
credentials = get_credentials(GCP_JSON_KEYPATH) | |
print ("Found {} GCP projects in settings".format(len(GCP_PROJECTS_BACKUP))) | |
if REPO_COMMIT: | |
# Clone repository if Git commits are enabled | |
print("Git commits enabled. Cloning repository {} to {}".format(REPO_LINK, LOCAL_PATH_BACKUP)) | |
# Delete folder / repository when existing | |
if os.path.exists(LOCAL_PATH_BACKUP): | |
git.rmtree(LOCAL_PATH_BACKUP) | |
if REPO_KEY_PATH == "": | |
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP) | |
else: | |
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP, env={"GIT_SSH_COMMAND": "ssh -i " + REPO_KEY_PATH}) # Use SSH key | |
# Remove old backups in repository | |
repo_dirs = next(os.walk(LOCAL_PATH_BACKUP))[1] | |
for dir in repo_dirs: | |
if not dir.startswith("."): | |
git.rmtree(os.path.join(LOCAL_PATH_BACKUP, dir)) | |
else: | |
# Only download to local filesystem. Create directory when it doesn't exist | |
if not os.path.exists(LOCAL_PATH_BACKUP): | |
os.makedirs(LOCAL_PATH_BACKUP) | |
# Loop through GCP project and save views and scheduled queries | |
for i, project in enumerate(GCP_PROJECTS_BACKUP): | |
print ("-- Starting backup for project: {}".format(project["project_id"])) | |
views = save_bigquery_views( | |
credentials, | |
project["project_id"], | |
LOCAL_PATH_BACKUP | |
) | |
print("# {} views saved..".format(views)) | |
scheduled_queries = save_bigquery_scheduled_queries( | |
credentials, project["project_id"], | |
project["location"], | |
LOCAL_PATH_BACKUP | |
) | |
print("# {} scheduled queries saved..".format(scheduled_queries)) | |
stored_procedures = save_bigquery_stored_procedures( | |
credentials, | |
project["project_id"], | |
LOCAL_PATH_BACKUP | |
) | |
print("# {} stored procedures saved...".format(stored_procedures)) | |
# Push code to remote repository | |
if REPO_COMMIT: | |
git_push_commit(repo, REPO_KEY_PATH, LOCAL_PATH_BACKUP) | |
print("Pushed code to repository..") | |
print("Done.") | |
execute() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment