Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save krisjan-oldekamp/78869851421af2f75325c32302fa2137 to your computer and use it in GitHub Desktop.
Save krisjan-oldekamp/78869851421af2f75325c32302fa2137 to your computer and use it in GitHub Desktop.
Backup BigQuery Views and Scheduled Queries to a Git repository using Python. Full article on stacktonic.com
############################################################
# Author Krisjan Oldekamp / Stacktonic.com
# Email [email protected]
# Article https://stacktonic.com/article/backup-your-valuable-big-query-views-and-scheduled-queries-using-python
############################################################
import os
import git
import google.oauth2.service_account
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer # Also enable the Data Transfer API in the GCP console
############################################################
# Settings
LOCAL_PATH_BACKUP = "./gcp-bigquery-sql-backup/" # Backup directory (end with a trailing /). When REPO_COMMIT is set to True, this folder will also be used to clone and commit to the Git repository
REPO_COMMIT = True # When set to True, the script will clone the Git repository specified in the REPO_LINK setting. Then it will delete all the folders in the repo, download the latest version of the views and scheduled queries to the repo and commits the changes.
REPO_LINK = "[email protected]..." # Repository link
REPO_KEY_PATH = "../.ssh/..." # Path to SSH private key used for authentication with the repository. Leave empty when not using an SSH key.
GCP_JSON_KEYPATH = "your-keyfile.json" # Path to JSON keyfile for Service Account
# You can specify multiple projects. The location / region is required for downloading scheduled query SQL.
GCP_PROJECTS_BACKUP = [
{
"project_id": "your-gcp-project-id",
"location": "europe" # See the "region" column in the GCP scheduled query interface
}
]
# End of settings
############################################################
# Authenticate use a Service account / JSON keyfile
def get_credentials(keyfile_json):
scopes=['https://www.googleapis.com/auth/bigquery']
return (
google.oauth2.service_account.Credentials.from_service_account_file(keyfile_json, scopes=scopes)
)
# Format scheduled query name so it can be used in a filename
def format_name(name):
return name.lower().replace(" ", "_")
# Push commit to repository
def git_push_commit(repo, repo_key_path, repo_local_path):
try:
if repo_key_path == "":
repo.git.add(all=True)
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.")
repo.remotes.origin.push()
else:
with git.Git().custom_environment(GIT_SSH_COMMAND="ssh -i " + repo_key_path):
repo.git.add(all=True)
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.")
repo.remotes.origin.push()
except:
print("Some error occured while pushing the commit")
# Save Google BigQuery views to local filesystem
def save_bigquery_views(credentials, project_id, path):
client_bq = bigquery.Client(credentials=credentials, project=project_id)
datasets = list(client_bq.list_datasets())
cnt_views = 0
# Loop datasets
if datasets:
for dataset in datasets:
dataset_name = dataset.dataset_id
dataset_ref = dataset.reference
tables = list(client_bq.list_tables(dataset_ref))
# Loop tables in dataset
for table in tables:
# Only select VIEW
if table.table_type == "VIEW":
table_name = table.table_id
table_ref = dataset_ref.table(table_name)
table = client_bq.get_table(table_ref)
backup_directory = path + project_id + "/bq_views/" + dataset_name
if not os.path.exists(backup_directory):
os.makedirs(backup_directory) # Create directory when it doesn't exist
# Save view SQL to file
f = open(backup_directory + "/" + table_name + ".sql", "w+") # Save view SQL to file
f.write(table.view_query)
cnt_views+=1
return cnt_views
# Save Google BigQuery scheduled queries to local filesystem
def save_bigquery_scheduled_queries(credentials, project_id, location, path):
client_bq_df = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
parent = "projects/{}/locations/{}".format(project_id, location)
cnt_scheduled_queries = 0
# Loop all Data Transfer elements in project
for element in client_bq_df.list_transfer_configs(parent):
scheduled_query_name = format_name(element.display_name)
scheduled_query_sql = ""
# Loop over Data Transfer parameters, select scheduled queries and get query SQL
params = element.params.items()
for key, value in params:
if key == "query":
scheduled_query_sql = value
backup_directory = path + project_id + "/bq_scheduled_queries/"
if not os.path.exists(backup_directory): # Create directory when it doesn't exist
os.makedirs(backup_directory)
# Write to file
f = open(backup_directory + "/" + scheduled_query_name + ".sql", "w+")
f.write(scheduled_query_sql)
cnt_scheduled_queries+=1
return cnt_scheduled_queries
def execute():
credentials = get_credentials(GCP_JSON_KEYPATH)
print ("Found {} GCP projects in settings".format(len(GCP_PROJECTS_BACKUP)))
if REPO_COMMIT:
# Clone repository if Git commits are enabled
print("Git commits enabled. Cloning repository {} to {}".format(REPO_LINK, LOCAL_PATH_BACKUP))
# Delete folder / repository when existing
if os.path.exists(LOCAL_PATH_BACKUP):
git.rmtree(LOCAL_PATH_BACKUP)
if REPO_KEY_PATH == "":
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP)
else:
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP, env={"GIT_SSH_COMMAND": "ssh -i " + REPO_KEY_PATH}) # Use SSH key
# Remove old backups in repository
repo_dirs = next(os.walk(LOCAL_PATH_BACKUP))[1]
for dir in repo_dirs:
if not dir.startswith("."):
git.rmtree(os.path.join(LOCAL_PATH_BACKUP, dir))
else:
# Only download to local filesystem. Create directory when it doesn't exist
if not os.path.exists(LOCAL_PATH_BACKUP):
os.makedirs(LOCAL_PATH_BACKUP)
# Loop through GCP project and save views and scheduled queries
for i, project in enumerate(GCP_PROJECTS_BACKUP):
print ("-- Starting backup for project: {}".format(project["project_id"]))
views = save_bigquery_views(
credentials,
project["project_id"],
LOCAL_PATH_BACKUP
)
print("# {} views saved..".format(views))
scheduled_queries = save_bigquery_scheduled_queries(
credentials, project["project_id"],
project["location"],
LOCAL_PATH_BACKUP
)
print("# {} scheduled queries saved..".format(scheduled_queries))
# Push code to remote repository
if REPO_COMMIT:
git_push_commit(repo, REPO_KEY_PATH, LOCAL_PATH_BACKUP)
print("Pushed code to repository..")
print("Done.")
execute()
@cgrinham
Copy link

cgrinham commented May 6, 2022

Thanks @krisjan-oldekamp works perfectly

@normwarren
Copy link

normwarren commented May 17, 2022

Hi. I get this error -- TypeError: Invalid constructor input for ListTransferConfigsRequest: 'projects/myproject/locations/us-central1'.
I know that I am entering the correct inputs for project/location. I can transfer Views, but get the error for scheduled queries.

@miguel-graindata
Copy link

@normwarren I came across the same error message. I found the solution in stackoverflow: https://stackoverflow.com/questions/67671525/bigquery-datatransfer-invalid-constructor-input-for-startmanualtransferrunsreq.

It seems that client_bq_df.list_transfer_configs(parent) is expecting infromation about the project and location in the format of a dictionary instead of a string.

@rich-ard
Copy link

rich-ard commented Dec 12, 2022

Following up on @miguel-graindata's comment: replace line 99 with:

parent = dict(parent= "projects/{}/locations/{}".format(project_id, location))

...and it works

(edited to write a better fix than I had)

@rich-ard
Copy link

Have gone a step further - my forked gist for this backs up stored procedures as well, if that's useful to anyone!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment