Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rich-ard/6e0421aaf7e42f8a557568b3d892059b to your computer and use it in GitHub Desktop.
Save rich-ard/6e0421aaf7e42f8a557568b3d892059b to your computer and use it in GitHub Desktop.
Backup BigQuery Views and Scheduled Queries to a Git repository using Python. Full article on stacktonic.com
############################################################
# Author Krisjan Oldekamp / Stacktonic.com
# Email [email protected]
# Article https://stacktonic.com/article/backup-your-valuable-big-query-views-and-scheduled-queries-using-python
############################################################
import os
import git
import google.oauth2.service_account
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer # Also enable the Data Transfer API in the GCP console
############################################################
# Settings
LOCAL_PATH_BACKUP = "./gcp-bigquery-sql-backup/" # Backup directory (end with a trailing /). When REPO_COMMIT is set to True, this folder will also be used to clone and commit to the Git repository
REPO_COMMIT = True # When set to True, the script will clone the Git repository specified in the REPO_LINK setting. Then it will delete all the folders in the repo, download the latest version of the views and scheduled queries to the repo and commits the changes.
REPO_LINK = "[email protected]..." # Repository link
REPO_KEY_PATH = "../.ssh/..." # Path to SSH private key used for authentication with the repository. Leave empty when not using an SSH key.
GCP_JSON_KEYPATH = "your-keyfile.json" # Path to JSON keyfile for Service Account
# You can specify multiple projects. The location / region is required for downloading scheduled query SQL.
GCP_PROJECTS_BACKUP = [
{
"project_id": "your-gcp-project-id",
"location": "europe" # See the "region" column in the GCP scheduled query interface
}
]
# End of settings
############################################################
# Authenticate use a Service account / JSON keyfile
def get_credentials(keyfile_json):
scopes=['https://www.googleapis.com/auth/bigquery']
return (
google.oauth2.service_account.Credentials.from_service_account_file(keyfile_json, scopes=scopes)
)
# Format scheduled query name so it can be used in a filename
def format_name(name):
return name.lower().replace(" ", "_")
# Push commit to repository
def git_push_commit(repo, repo_key_path, repo_local_path):
try:
if repo_key_path == "":
repo.git.add(all=True)
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.")
repo.remotes.origin.push()
else:
with git.Git().custom_environment(GIT_SSH_COMMAND="ssh -i " + repo_key_path):
repo.git.add(all=True)
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.")
repo.remotes.origin.push()
except:
print("Some error occured while pushing the commit")
# Save Google BigQuery views to local filesystem
def save_bigquery_views(credentials, project_id, path):
client_bq = bigquery.Client(credentials=credentials, project=project_id)
datasets = list(client_bq.list_datasets())
cnt_views = 0
# Loop datasets
if datasets:
for dataset in datasets:
dataset_name = dataset.dataset_id
dataset_ref = dataset.reference
tables = list(client_bq.list_tables(dataset_ref))
# Loop tables in dataset
for table in tables:
# Only select VIEW
if table.table_type == "VIEW":
table_name = table.table_id
table_ref = dataset_ref.table(table_name)
table = client_bq.get_table(table_ref)
backup_directory = path + project_id + "/bq_views/" + dataset_name
if not os.path.exists(backup_directory):
os.makedirs(backup_directory) # Create directory when it doesn't exist
# Save view SQL to file
f = open(backup_directory + "/" + table_name + ".sql", "w+") # Save view SQL to file
f.write(table.view_query)
cnt_views+=1
return cnt_views
# Save Google BigQuery scheduled queries to local filesystem
def save_bigquery_scheduled_queries(credentials, project_id, location, path):
client_bq_df = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
parent = "projects/{}/locations/{}".format(project_id, location)
cnt_scheduled_queries = 0
# Loop all Data Transfer elements in project
for element in client_bq_df.list_transfer_configs({"parent": parent}):
scheduled_query_name = format_name(element.display_name)
scheduled_query_sql = ""
# Loop over Data Transfer parameters, select scheduled queries and get query SQL
params = element.params.items()
for key, value in params:
if key == "query":
scheduled_query_sql = value
backup_directory = path + project_id + "/bq_scheduled_queries/"
if not os.path.exists(backup_directory): # Create directory when it doesn't exist
os.makedirs(backup_directory)
# Write to file
f = open(backup_directory + "/" + scheduled_query_name + ".sql", "w+")
f.write(scheduled_query_sql)
cnt_scheduled_queries+=1
return cnt_scheduled_queries
# Save Google BigQuery stored procedures to local filesystem
def save_bigquery_stored_procedures(credentials, project_id, path):
client_bq = bigquery.Client(credentials=credentials, project=project_id)
datasets = list(client_bq.list_datasets())
cnt_views = 0
# Loop datasets
if datasets:
for dataset in datasets:
dataset_name = dataset.dataset_id
dataset_ref = dataset.reference
stored_procs = list(client_bq.list_routines(dataset_ref))
# Loop stored procedures in dataset
for stored_proc in stored_procs:
stored_proc_routine = client_bq.get_routine(stored_proc)._properties['definitionBody']
# Only select VIEW
'''
if table.table_type == "VIEW":
table_name = table.table_id
table_ref = dataset_ref.table(table_name)
table = client_bq.get_table(table_ref)
'''
backup_directory = path + project_id + "/bq_stored_procedures/" + dataset_name
if not os.path.exists(backup_directory):
os.makedirs(backup_directory) # Create directory when it doesn't exist
# Save view SQL to file
f = open(backup_directory + "/" + dataset_name + ".sql", "w+") # Save view SQL to file
f.write(stored_proc_routine)
cnt_views+=1
return cnt_views
def execute():
credentials = get_credentials(GCP_JSON_KEYPATH)
print ("Found {} GCP projects in settings".format(len(GCP_PROJECTS_BACKUP)))
if REPO_COMMIT:
# Clone repository if Git commits are enabled
print("Git commits enabled. Cloning repository {} to {}".format(REPO_LINK, LOCAL_PATH_BACKUP))
# Delete folder / repository when existing
if os.path.exists(LOCAL_PATH_BACKUP):
git.rmtree(LOCAL_PATH_BACKUP)
if REPO_KEY_PATH == "":
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP)
else:
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP, env={"GIT_SSH_COMMAND": "ssh -i " + REPO_KEY_PATH}) # Use SSH key
# Remove old backups in repository
repo_dirs = next(os.walk(LOCAL_PATH_BACKUP))[1]
for dir in repo_dirs:
if not dir.startswith("."):
git.rmtree(os.path.join(LOCAL_PATH_BACKUP, dir))
else:
# Only download to local filesystem. Create directory when it doesn't exist
if not os.path.exists(LOCAL_PATH_BACKUP):
os.makedirs(LOCAL_PATH_BACKUP)
# Loop through GCP project and save views and scheduled queries
for i, project in enumerate(GCP_PROJECTS_BACKUP):
print ("-- Starting backup for project: {}".format(project["project_id"]))
views = save_bigquery_views(
credentials,
project["project_id"],
LOCAL_PATH_BACKUP
)
print("# {} views saved..".format(views))
scheduled_queries = save_bigquery_scheduled_queries(
credentials, project["project_id"],
project["location"],
LOCAL_PATH_BACKUP
)
print("# {} scheduled queries saved..".format(scheduled_queries))
stored_procedures = save_bigquery_stored_procedures(
credentials,
project["project_id"],
LOCAL_PATH_BACKUP
)
print("# {} stored procedures saved...".format(stored_procedures))
# Push code to remote repository
if REPO_COMMIT:
git_push_commit(repo, REPO_KEY_PATH, LOCAL_PATH_BACKUP)
print("Pushed code to repository..")
print("Done.")
execute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment