-
-
Save krisjan-oldekamp/78869851421af2f75325c32302fa2137 to your computer and use it in GitHub Desktop.
############################################################ | |
# Author Krisjan Oldekamp / Stacktonic.com | |
# Email [email protected] | |
# Article https://stacktonic.com/article/backup-your-valuable-big-query-views-and-scheduled-queries-using-python | |
############################################################ | |
import os | |
import git | |
import google.oauth2.service_account | |
from google.cloud import bigquery | |
from google.cloud import bigquery_datatransfer # Also enable the Data Transfer API in the GCP console | |
############################################################ | |
# Settings | |
LOCAL_PATH_BACKUP = "./gcp-bigquery-sql-backup/" # Backup directory (end with a trailing /). When REPO_COMMIT is set to True, this folder will also be used to clone and commit to the Git repository | |
REPO_COMMIT = True # When set to True, the script will clone the Git repository specified in the REPO_LINK setting. Then it will delete all the folders in the repo, download the latest version of the views and scheduled queries to the repo and commits the changes. | |
REPO_LINK = "[email protected]..." # Repository link | |
REPO_KEY_PATH = "../.ssh/..." # Path to SSH private key used for authentication with the repository. Leave empty when not using an SSH key. | |
GCP_JSON_KEYPATH = "your-keyfile.json" # Path to JSON keyfile for Service Account | |
# You can specify multiple projects. The location / region is required for downloading scheduled query SQL. | |
GCP_PROJECTS_BACKUP = [ | |
{ | |
"project_id": "your-gcp-project-id", | |
"location": "europe" # See the "region" column in the GCP scheduled query interface | |
} | |
] | |
# End of settings | |
############################################################ | |
# Authenticate use a Service account / JSON keyfile | |
def get_credentials(keyfile_json): | |
scopes=['https://www.googleapis.com/auth/bigquery'] | |
return ( | |
google.oauth2.service_account.Credentials.from_service_account_file(keyfile_json, scopes=scopes) | |
) | |
# Format scheduled query name so it can be used in a filename | |
def format_name(name): | |
return name.lower().replace(" ", "_") | |
# Push commit to repository | |
def git_push_commit(repo, repo_key_path, repo_local_path): | |
try: | |
if repo_key_path == "": | |
repo.git.add(all=True) | |
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.") | |
repo.remotes.origin.push() | |
else: | |
with git.Git().custom_environment(GIT_SSH_COMMAND="ssh -i " + repo_key_path): | |
repo.git.add(all=True) | |
repo.index.commit("Automated commit containing changes to BigQuery views and scheduled queries.") | |
repo.remotes.origin.push() | |
except: | |
print("Some error occured while pushing the commit") | |
# Save Google BigQuery views to local filesystem | |
def save_bigquery_views(credentials, project_id, path): | |
client_bq = bigquery.Client(credentials=credentials, project=project_id) | |
datasets = list(client_bq.list_datasets()) | |
cnt_views = 0 | |
# Loop datasets | |
if datasets: | |
for dataset in datasets: | |
dataset_name = dataset.dataset_id | |
dataset_ref = dataset.reference | |
tables = list(client_bq.list_tables(dataset_ref)) | |
# Loop tables in dataset | |
for table in tables: | |
# Only select VIEW | |
if table.table_type == "VIEW": | |
table_name = table.table_id | |
table_ref = dataset_ref.table(table_name) | |
table = client_bq.get_table(table_ref) | |
backup_directory = path + project_id + "/bq_views/" + dataset_name | |
if not os.path.exists(backup_directory): | |
os.makedirs(backup_directory) # Create directory when it doesn't exist | |
# Save view SQL to file | |
f = open(backup_directory + "/" + table_name + ".sql", "w+") # Save view SQL to file | |
f.write(table.view_query) | |
cnt_views+=1 | |
return cnt_views | |
# Save Google BigQuery scheduled queries to local filesystem | |
def save_bigquery_scheduled_queries(credentials, project_id, location, path): | |
client_bq_df = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials) | |
parent = "projects/{}/locations/{}".format(project_id, location) | |
cnt_scheduled_queries = 0 | |
# Loop all Data Transfer elements in project | |
for element in client_bq_df.list_transfer_configs(parent): | |
scheduled_query_name = format_name(element.display_name) | |
scheduled_query_sql = "" | |
# Loop over Data Transfer parameters, select scheduled queries and get query SQL | |
params = element.params.items() | |
for key, value in params: | |
if key == "query": | |
scheduled_query_sql = value | |
backup_directory = path + project_id + "/bq_scheduled_queries/" | |
if not os.path.exists(backup_directory): # Create directory when it doesn't exist | |
os.makedirs(backup_directory) | |
# Write to file | |
f = open(backup_directory + "/" + scheduled_query_name + ".sql", "w+") | |
f.write(scheduled_query_sql) | |
cnt_scheduled_queries+=1 | |
return cnt_scheduled_queries | |
def execute(): | |
credentials = get_credentials(GCP_JSON_KEYPATH) | |
print ("Found {} GCP projects in settings".format(len(GCP_PROJECTS_BACKUP))) | |
if REPO_COMMIT: | |
# Clone repository if Git commits are enabled | |
print("Git commits enabled. Cloning repository {} to {}".format(REPO_LINK, LOCAL_PATH_BACKUP)) | |
# Delete folder / repository when existing | |
if os.path.exists(LOCAL_PATH_BACKUP): | |
git.rmtree(LOCAL_PATH_BACKUP) | |
if REPO_KEY_PATH == "": | |
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP) | |
else: | |
repo = git.Repo.clone_from(REPO_LINK, LOCAL_PATH_BACKUP, env={"GIT_SSH_COMMAND": "ssh -i " + REPO_KEY_PATH}) # Use SSH key | |
# Remove old backups in repository | |
repo_dirs = next(os.walk(LOCAL_PATH_BACKUP))[1] | |
for dir in repo_dirs: | |
if not dir.startswith("."): | |
git.rmtree(os.path.join(LOCAL_PATH_BACKUP, dir)) | |
else: | |
# Only download to local filesystem. Create directory when it doesn't exist | |
if not os.path.exists(LOCAL_PATH_BACKUP): | |
os.makedirs(LOCAL_PATH_BACKUP) | |
# Loop through GCP project and save views and scheduled queries | |
for i, project in enumerate(GCP_PROJECTS_BACKUP): | |
print ("-- Starting backup for project: {}".format(project["project_id"])) | |
views = save_bigquery_views( | |
credentials, | |
project["project_id"], | |
LOCAL_PATH_BACKUP | |
) | |
print("# {} views saved..".format(views)) | |
scheduled_queries = save_bigquery_scheduled_queries( | |
credentials, project["project_id"], | |
project["location"], | |
LOCAL_PATH_BACKUP | |
) | |
print("# {} scheduled queries saved..".format(scheduled_queries)) | |
# Push code to remote repository | |
if REPO_COMMIT: | |
git_push_commit(repo, REPO_KEY_PATH, LOCAL_PATH_BACKUP) | |
print("Pushed code to repository..") | |
print("Done.") | |
execute() |
@richardbiddle You're completely right, updated the snippet.
Thanks @krisjan-oldekamp works perfectly
Hi. I get this error -- TypeError: Invalid constructor input for ListTransferConfigsRequest: 'projects/myproject/locations/us-central1'.
I know that I am entering the correct inputs for project/location. I can transfer Views, but get the error for scheduled queries.
@normwarren I came across the same error message. I found the solution in stackoverflow: https://stackoverflow.com/questions/67671525/bigquery-datatransfer-invalid-constructor-input-for-startmanualtransferrunsreq.
It seems that client_bq_df.list_transfer_configs(parent)
is expecting infromation about the project and location in the format of a dictionary instead of a string.
Following up on @miguel-graindata's comment: replace line 99 with:
parent = dict(parent= "projects/{}/locations/{}".format(project_id, location))
...and it works
(edited to write a better fix than I had)
Have gone a step further - my forked gist for this backs up stored procedures as well, if that's useful to anyone!
Looks to be missing f.write(table.view_query) in save_bigquery_views https://gist.github.com/krisjan-oldekamp/78869851421af2f75325c32302fa2137#file-google_bigquery_backup_views_scheduled_queries_git-py-L90