Skip to content

Instantly share code, notes, and snippets.

@sloev
Last active September 12, 2023 19:41
Show Gist options
  • Save sloev/e8e717d7a725b6f03909fd7fd8d4faba to your computer and use it in GitHub Desktop.
Save sloev/e8e717d7a725b6f03909fd7fd8d4faba to your computer and use it in GitHub Desktop.
how to use bigquery transfer service from python for cross regional dataset copy on schedule
"""
How to create cross-region scheduled bigquery dataset copy jobs in GCP using python
______________________________________________________________
* use python 3.7+
* install requirements
google-api-python-client==1.12.8
google-auth==1.23.0
google-auth-oauthlib==0.4.2
google-auth-httplib2==0.0.4
google-cloud-bigquery==2.6.0
google.cloud==0.34.0
protobuf==4.0.0rc2
google-cloud-bigquery-datatransfer==2.1.0
* create source project and destination project
* create service account in source project
* enable google data trasnfer API on source and destination project
https://console.cloud.google.com/apis/api/bigquerydatatransfer.googleapis.com/overview
* add roles to your service account via IAM of destination project
add these roles to your service account:
BigQuery Admin
BigQuery Data Transfer Service Agent
* add roles to your service account via IAM of source project
add these roles to your service account:
BigQuery Admin
BigQuery Data Transfer Service Agent
Service Account Token Creator
* make service account able to auth with google data trasnfer api
$ gcloud iam service-accounts add-iam-policy-binding {service account email} \
--member='serviceAccount:service-{user_project_number}@'\
'gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com' \
--role='roles/iam.serviceAccountTokenCreator'
reading material:
* https://cloud.google.com/bigquery/docs/copying-datasets
* https://cloud.google.com/bigquery/docs/enable-transfer-service
"""
import logging
import os
import time
from typing import List
import time
import google.protobuf.json_format
from google.protobuf.timestamp_pb2 import Timestamp
from google.api_core.exceptions import ResourceExhausted
from google.cloud import bigquery_datatransfer_v1
bigquery_data_transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()
SOURCE_PROJECT = "source"
DESTINATION_PROJECT = "destination"
DESTINATION_PROJECT_LOCATION = "europe-west3"
DATASET_NAME = "dataset"
NOTIFICATION_TOPIC = os.environ.get("NOTIFICATION_TOPIC")
SUPPORTED_LOCATIONS = [
"us-west4",
"us-west2",
"northamerica-northeast1",
"us-east4",
"us-west1",
"us-west3",
"southamerica-east1",
"us-east1",
"europe-west1",
"europe-north1",
"europe-west3",
"europe-west2",
"europe-west4",
"europe-west6",
"asia-east2",
"asia-southeast2",
"asia-south1",
"asia-northeast2",
"asia-northeast3",
"asia-southeast1",
"australia-southeast1",
"asia-east1",
"asia-northeast1",
]
def paginate_transfer_configs():
logging.warning(
f"going through all {len(SUPPORTED_LOCATIONS)} supported locations, might take a few moments"
)
for location in SUPPORTED_LOCATIONS:
request = bigquery_datatransfer_v1.types.ListTransferConfigsRequest(
{
"parent": f"projects/{DESTINATION_PROJECT}/locations/{location}",
}
)
try:
transfer_configs = bigquery_data_transfer_client.list_transfer_configs(
request
)
except ResourceExhausted:
logging.warning(
"got throttled during fetch of transfer config, waiting 5 second and retrying"
)
time.sleep(5)
transfer_configs = bigquery_data_transfer_client.list_transfer_configs(
request
)
for transfer_config in transfer_configs:
yield transfer_config
def get_transfer_config():
request = bigquery_datatransfer_v1.types.ListTransferConfigsRequest(
{
"parent": f"projects/{DESTINATION_PROJECT}/locations/{DESTINATION_PROJECT_LOCATION}",
}
)
try:
transfer_configs = bigquery_data_transfer_client.list_transfer_configs(request)
except ResourceExhausted:
logging.warning(
"got throttled during fetch of transfer config, waiting 1 second and retrying"
)
time.sleep(1)
transfer_configs = bigquery_data_transfer_client.list_transfer_configs(request)
for transfer_config in transfer_configs:
if transfer_config.destination_dataset_id == DATASET_NAME:
return transfer_config
def create_transfer_config():
transfer_config = {
"destination_dataset_id": DATASET_NAME,
"display_name": DATASET_NAME,
"data_source_id": "cross_region_copy",
"params": {
"source_dataset_id": DATASET_NAME,
"source_project_id": SOURCE_PROJECT,
"overwrite_destination_table": True,
},
"schedule": "every day 04:00",
}
if NOTIFICATION_TOPIC:
transfer_config["notification_pubsub_topic"] = NOTIFICATION_TOPIC
transfer_config = google.protobuf.json_format.ParseDict(
transfer_config,
bigquery_datatransfer_v1.types.TransferConfig()._pb,
)
transfer_config = bigquery_data_transfer_client.create_transfer_config(
parent=f"projects/{DESTINATION_PROJECT}", transfer_config=transfer_config
)
logging.info(f"created transfer_config with name: {transfer_config.name}")
return transfer_config
def run_transfer_config(transfer_config):
start_time = Timestamp(seconds=int(time.time()))
request = bigquery_datatransfer_v1.types.StartManualTransferRunsRequest(
{"parent": transfer_config.name, "requested_run_time": start_time}
)
response = bigquery_data_transfer_client.start_manual_transfer_runs(
request, timeout=360
)
logging.info(f"transfer config with name {transfer_config.name} is now running")
return response.runs[0].name
def delete_transfer_config(transfer_config):
request = bigquery_datatransfer_v1.types.DeleteTransferConfigRequest(
{"name": transfer_config.name}
)
bigquery_data_transfer_client.delete_transfer_config(request)
logging.info(f"deleted transfer_config with name: {transfer_config.name}")
if __name__ == "__main__":
create_transfer_config()
for transfer_config in paginate_transfer_configs():
print(transfer_config)
transfer_config = get_transfer_config()
run_transfer_config(transfer_config)
delete_transfer_config(transfer_config)
for transfer_config in paginate_transfer_configs():
print(transfer_config)
@sloev
Copy link
Author

sloev commented Dec 10, 2020

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment