Last active
October 18, 2021 01:48
-
-
Save n0531m/caa4d94e27cd1184f1e26fa4b88a79a0 to your computer and use it in GitHub Desktop.
Run Cloud Dataflow template JDBC->BigQuery against Cloud SQL for MySQL with Private IP setup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
##Source : Cloud SQL Instance/Table/SQL for extraction | |
CLOUDSQL_INSTANCE_PROJECT=imfeelinglucky-utilities | |
CLOUDSQL_INSTANCE_REGION=asia-southeast1 | |
CLOUDSQL_INSTANCE_NAME=testmysql57 | |
CLOUDSQL_INSTANCE_USERNAME= | |
CLOUDSQL_INSTANCE_USERPASS= | |
CLOUSSQL_SQL="select * from mysql.user" | |
##Pipeline env | |
DATAFLOW_PROJECT=imfeelinglucky-utilities ## project to run the pipeline in | |
DATAFLOW_REGION=asia-southeast1 ## region in which to run the worker | |
DATAFLOW_WORKER_NETWORK=default ## network on which the worker nodes will be instantiated. private access to be enabled. | |
##CloudSQL --> BigQuery template specific | |
GCS_BUCKET_JDBC_DRIVER=gs://imfeelinglucky-jdbc-drivers | |
GCS_BUCKET_TEMP=gs://imfeelinglucky-jdbc2bq-temp | |
##Sink : BigQuery Dataset/Table (needs to be pre-created!) | |
# if a table->table copy, consider extracting MySQL Table DDL and converting it to a BigQuery compatible DDL and create table | |
BIGQUERY_PROJECT=imfeelinglucky-utilities | |
BIGQUERY_DATASET=jdbc2bq | |
BIGQUERY_TABLE=fromdf | |
function prepareDrivers { | |
echo "#prepareDrivers" | |
## can be downloaded from https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/releases | |
## or, can be built. | |
git clone https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory | |
cd cloud-sql-jdbc-socket-factory | |
mvn -P jar-with-dependencies clean package -DskipTests | |
gsutil ls -b gs://${GCS_BUCKET_JDBC_DRIVER} || gsutil mb -l ${DATAFLOW_REGION} gs://${GCS_BUCKET_JDBC_DRIVER} | |
find ./jdbc -name *jar-with-dependencies.jar | xargs -n 2 -P 0 -I {} gsutil cp -c {} ${GCS_BUCKET_JDBC_DRIVER}/ | |
cd .. | |
## MySQL's jdbc driver still needs to be separately acquired | |
## https://dev.mysql.com/doc/index-connectors.html | |
## https://www.mysql.com/products/connector/ | |
## https://downloads.mysql.com/archives/c-j/ | |
## https://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.47 | |
## otherwise, you can make a uberjar with both the driver and socket factory included | |
## example pom.xml : https://gist.github.com/n0531m/1ec7cf9fe30a702794c5b406c6705e1b | |
} | |
function prepareSync { | |
echo "#prepareSync" | |
echo "BigQuery Dataset and Table needs to be pre-created" | |
} | |
function initiateDataflowPipeline { | |
CLOUDSQL_INSTANCE_USERNAME=$1 | |
CLOUDSQL_INSTANCE_USERPASS=$2 | |
# make sure the temp bucket exists | |
gsutil ls -b gs://${GCS_BUCKET_TEMP} || gsutil mb -l ${DATAFLOW_REGION} gs://${GCS_BUCKET_TEMP} | |
# create unique job name based on timestamp | |
DATAFLOW_JOB_NAME=jdbc2bq-mysql57-$(TZ=":Asia/Singapore" date '+%Y%m%d%H%M%S') | |
# --parameters (for Cloud SQL for MySQL 5.x) | |
connectionURL="jdbc:mysql:///mysql?cloudSqlInstance=${CLOUDSQL_INSTANCE_PROJECT}:${CLOUDSQL_INSTANCE_REGION}:${CLOUDSQL_INSTANCE_NAME}&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=${CLOUDSQL_INSTANCE_USERNAME}&password=${CLOUDSQL_INSTANCE_USERPASS}" | |
driverClassName=com.mysql.jdbc.Driver | |
query=${CLOUSSQL_SQL} | |
outputTable=${BIGQUERY_PROJECT}:${BIGQUERY_DATASET}.${BIGQUERY_TABLE} | |
## JDBC Driver and Cloud SQL SocketFactory jars pre-uploaded to Cloud Storage | |
driverJars=${GCS_BUCKET_JDBC_DRIVER}/mysql-socket-factory-1.3.5-SNAPSHOT-jar-with-dependencies.jar,${GCS_BUCKET_JDBC_DRIVER}/mysql-connector-java-5.1.49.jar | |
bigQueryLoadingTemporaryDirectory=${GCS_BUCKET_TEMP}/bqloadtemp | |
## run dataflow template JDBC->BigQuery agains CloudSQL for MySQL (private IP setup) | |
## https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#jdbctobigquery | |
## Template source code can be seen here | |
## https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/HEAD/src/main/java/com/google/cloud/teleport/templates/JdbcToBigQuery.java | |
## data extracted from MySQL will be converted to TableRow | |
## https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/common/JdbcConverters.java | |
## more general Dataflow options can be added (MachineType, etc.) as required | |
## in this case, making sure everything runs in a private IP setup | |
gcloud --project ${DATAFLOW_PROJECT} dataflow jobs run ${DATAFLOW_JOB_NAME} \ | |
--gcs-location gs://dataflow-templates-asia-southeast1/latest/Jdbc_to_BigQuery \ | |
--region ${DATAFLOW_REGION} \ | |
--network ${DATAFLOW_WORKER_NETWORK} \ | |
--disable-public-ips \ | |
--staging-location ${GCS_BUCKET_TEMP}/dataflowstaging \ | |
--parameters ^~^connectionURL="${connectionURL}~driverClassName=${driverClassName}~query=${query}~outputTable=${outputTable}~driverJars=${driverJars}~bigQueryLoadingTemporaryDirectory=${bigQueryLoadingTemporaryDirectory}" | |
} | |
function initiateCloudSQLExport { | |
## https://cloud.google.com/sql/docs/mysql/import-export/import-export-csv#export_data_to_a_csv_file | |
local SERVICE_ACCOUNT=$(gcloud --format json sql instances describe ${CLOUDSQL_INSTANCE_NAME} | jq -r '.serviceAccountEmailAddress') | |
local BUCKET=imfeelinglucky-cloudsqlexport | |
gsutil ls -b gs://${BUCKET} || gsutil mb -l asia-southeast1 gs://${BUCKET} | |
gsutil iam ch serviceAccount:${SERVICE_ACCOUNT}:objectAdmin gs://${BUCKET} | |
#gsutil iam ch serviceAccount:${SERVICE_ACCOUNT}:roles/storage.objectViewer gs://${BUCKET} | |
gcloud sql export csv ${CLOUDSQL_INSTANCE_NAME} gs://${BUCKET}/cloudsqlexport.csv --query "${CLOUSSQL_SQL}" --offload | |
#gcloud sql export sql ${CLOUDSQL_INSTANCE_NAME} gs://${BUCKET}/cloudsqlexport.sql -d mysql,sys,performance_schema --offload | |
} | |
function usage { | |
echo "usage : " | |
echo "$0 prepareDrivers" | |
echo "$0 initiateDataflowPipeline <dbuser> <dbpass>" | |
echo "$0 initiateCloudSQLExport " | |
} | |
"$@" | |
if [[ "$#" == 0 ]]; then | |
usage | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment