Last active
September 12, 2023 19:41
-
-
Save sloev/e8e717d7a725b6f03909fd7fd8d4faba to your computer and use it in GitHub Desktop.
how to use bigquery transfer service from python for cross regional dataset copy on schedule
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
How to create cross-region scheduled bigquery dataset copy jobs in GCP using python | |
______________________________________________________________ | |
* use python 3.7+ | |
* install requirements | |
google-api-python-client==1.12.8 | |
google-auth==1.23.0 | |
google-auth-oauthlib==0.4.2 | |
google-auth-httplib2==0.0.4 | |
google-cloud-bigquery==2.6.0 | |
google.cloud==0.34.0 | |
protobuf==4.0.0rc2 | |
google-cloud-bigquery-datatransfer==2.1.0 | |
* create source project and destination project | |
* create service account in source project | |
* enable google data trasnfer API on source and destination project | |
https://console.cloud.google.com/apis/api/bigquerydatatransfer.googleapis.com/overview | |
* add roles to your service account via IAM of destination project | |
add these roles to your service account: | |
BigQuery Admin | |
BigQuery Data Transfer Service Agent | |
* add roles to your service account via IAM of source project | |
add these roles to your service account: | |
BigQuery Admin | |
BigQuery Data Transfer Service Agent | |
Service Account Token Creator | |
* make service account able to auth with google data trasnfer api | |
$ gcloud iam service-accounts add-iam-policy-binding {service account email} \ | |
--member='serviceAccount:service-{user_project_number}@'\ | |
'gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com' \ | |
--role='roles/iam.serviceAccountTokenCreator' | |
reading material: | |
* https://cloud.google.com/bigquery/docs/copying-datasets | |
* https://cloud.google.com/bigquery/docs/enable-transfer-service | |
""" | |
import logging | |
import os | |
import time | |
from typing import List | |
import time | |
import google.protobuf.json_format | |
from google.protobuf.timestamp_pb2 import Timestamp | |
from google.api_core.exceptions import ResourceExhausted | |
from google.cloud import bigquery_datatransfer_v1 | |
bigquery_data_transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient() | |
SOURCE_PROJECT = "source" | |
DESTINATION_PROJECT = "destination" | |
DESTINATION_PROJECT_LOCATION = "europe-west3" | |
DATASET_NAME = "dataset" | |
NOTIFICATION_TOPIC = os.environ.get("NOTIFICATION_TOPIC") | |
SUPPORTED_LOCATIONS = [ | |
"us-west4", | |
"us-west2", | |
"northamerica-northeast1", | |
"us-east4", | |
"us-west1", | |
"us-west3", | |
"southamerica-east1", | |
"us-east1", | |
"europe-west1", | |
"europe-north1", | |
"europe-west3", | |
"europe-west2", | |
"europe-west4", | |
"europe-west6", | |
"asia-east2", | |
"asia-southeast2", | |
"asia-south1", | |
"asia-northeast2", | |
"asia-northeast3", | |
"asia-southeast1", | |
"australia-southeast1", | |
"asia-east1", | |
"asia-northeast1", | |
] | |
def paginate_transfer_configs(): | |
logging.warning( | |
f"going through all {len(SUPPORTED_LOCATIONS)} supported locations, might take a few moments" | |
) | |
for location in SUPPORTED_LOCATIONS: | |
request = bigquery_datatransfer_v1.types.ListTransferConfigsRequest( | |
{ | |
"parent": f"projects/{DESTINATION_PROJECT}/locations/{location}", | |
} | |
) | |
try: | |
transfer_configs = bigquery_data_transfer_client.list_transfer_configs( | |
request | |
) | |
except ResourceExhausted: | |
logging.warning( | |
"got throttled during fetch of transfer config, waiting 5 second and retrying" | |
) | |
time.sleep(5) | |
transfer_configs = bigquery_data_transfer_client.list_transfer_configs( | |
request | |
) | |
for transfer_config in transfer_configs: | |
yield transfer_config | |
def get_transfer_config(): | |
request = bigquery_datatransfer_v1.types.ListTransferConfigsRequest( | |
{ | |
"parent": f"projects/{DESTINATION_PROJECT}/locations/{DESTINATION_PROJECT_LOCATION}", | |
} | |
) | |
try: | |
transfer_configs = bigquery_data_transfer_client.list_transfer_configs(request) | |
except ResourceExhausted: | |
logging.warning( | |
"got throttled during fetch of transfer config, waiting 1 second and retrying" | |
) | |
time.sleep(1) | |
transfer_configs = bigquery_data_transfer_client.list_transfer_configs(request) | |
for transfer_config in transfer_configs: | |
if transfer_config.destination_dataset_id == DATASET_NAME: | |
return transfer_config | |
def create_transfer_config(): | |
transfer_config = { | |
"destination_dataset_id": DATASET_NAME, | |
"display_name": DATASET_NAME, | |
"data_source_id": "cross_region_copy", | |
"params": { | |
"source_dataset_id": DATASET_NAME, | |
"source_project_id": SOURCE_PROJECT, | |
"overwrite_destination_table": True, | |
}, | |
"schedule": "every day 04:00", | |
} | |
if NOTIFICATION_TOPIC: | |
transfer_config["notification_pubsub_topic"] = NOTIFICATION_TOPIC | |
transfer_config = google.protobuf.json_format.ParseDict( | |
transfer_config, | |
bigquery_datatransfer_v1.types.TransferConfig()._pb, | |
) | |
transfer_config = bigquery_data_transfer_client.create_transfer_config( | |
parent=f"projects/{DESTINATION_PROJECT}", transfer_config=transfer_config | |
) | |
logging.info(f"created transfer_config with name: {transfer_config.name}") | |
return transfer_config | |
def run_transfer_config(transfer_config): | |
start_time = Timestamp(seconds=int(time.time())) | |
request = bigquery_datatransfer_v1.types.StartManualTransferRunsRequest( | |
{"parent": transfer_config.name, "requested_run_time": start_time} | |
) | |
response = bigquery_data_transfer_client.start_manual_transfer_runs( | |
request, timeout=360 | |
) | |
logging.info(f"transfer config with name {transfer_config.name} is now running") | |
return response.runs[0].name | |
def delete_transfer_config(transfer_config): | |
request = bigquery_datatransfer_v1.types.DeleteTransferConfigRequest( | |
{"name": transfer_config.name} | |
) | |
bigquery_data_transfer_client.delete_transfer_config(request) | |
logging.info(f"deleted transfer_config with name: {transfer_config.name}") | |
if __name__ == "__main__": | |
create_transfer_config() | |
for transfer_config in paginate_transfer_configs(): | |
print(transfer_config) | |
transfer_config = get_transfer_config() | |
run_transfer_config(transfer_config) | |
delete_transfer_config(transfer_config) | |
for transfer_config in paginate_transfer_configs(): | |
print(transfer_config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
mentioned in googleapis/python-bigquery-datatransfer#82