Skip to content

Instantly share code, notes, and snippets.

@beckyconning
Last active July 4, 2019 23:43
Show Gist options
  • Save beckyconning/fd4574cb6766d4c9680b10b0a81b396e to your computer and use it in GitHub Desktop.
Save beckyconning/fd4574cb6766d4c9680b10b0a81b396e to your computer and use it in GitHub Desktop.

What it does

It streams all tables (excluding archived ones) into Google Big Query.

Scheduling this in the cloud

This will start up a server which will stream data from data sources like Atlas, MongoDB, S3 and Azure Blob Storage into Google Big Query on a schedule e.g. every day at midnight or every hour. You don't need anything other than this server. When this process is complete it will shut down the server so if you want to add more tables you'll need to start it back up. Note that if the server is on when its scheduled to begin the etl process it will restart.

  • https://console.cloud.google.com/compute/instancesAdd Create a fast high memory Google Cloud VM Instance called slamdata-bigquery with access scopes set to "Allow full access to all Cloud API" and in the Management, security, disks, networking, sole tenancy" section, in the "Networking" section add the network tag my-ip-web then edit the network interface and create an external ip address called external, make a note of this ip address as its the ip you will use to browse your data and create virtual tables
  • SSH into this instance and make a note of your user name
  • Install Google StackDriver logging agent https://cloud.google.com/logging/docs/agent/installation. This will help in troubleshooting
  • sudo curl -sSL https://get.docker.com/ | sh
  • Go to https://cloud.docker.com/u/slamdata/repository/docker/slamdata/slamdata and find out the latest version number use this version number in place of 1.5.14 in the following steps
  • sudo docker login
  • sudo docker pull slamdata/slamdata:1.5.14
  • sudo docker volume create vdw-volume
  • sudo docker run --restart=always -d -p 80:80 -v vdw-volume:/var/lib/slamdata/data --name vdw slamdata/slamdata:1.5.14
  • Perform the steps in the installation section below (ignore the standalone usage instructions)
  • Go to https://www.google.com/search?q=whats+my+ip and make a note of your IP address
  • Create a firewall rule https://console.cloud.google.com/networking/firewalls/add called slamdata-biqquery with the target tag my-ip-web, a source IP range of your ip address and the TCP ports 80, 443
  • Create a Google Cloud Function https://console.cloud.google.com/functions/ called stop-then-start-instance-with-metadata with the runtime Node.js 8, the source of the index.js and package.json below, pub/sub trigger with a new topic called slamdata-bigquery-start, a timeout of 540 seconds, the function to execute startInstancePubSub and 128MB ram
  • If you don't already have a BigQuery dataset go to https://console.cloud.google.com/bigquery and create one by selecting your project in the resources and then clicking Create dataset
  • Create a Google Cloud Scheduler job https://console.cloud.google.com/cloudscheduler with the name slamdata-bigquery-start the desired frequency (0 0 * * * is daily at midnight) and timezone, target Pub/Sub, topic slamdata-bigquery-start and payload of payload.json below with the details changed to match your project, VM, user name and BigQuery dataset
  • Now any tables you create which are not archived will be streamed into Google BigQuery on the schedule specified
  • We reccomend you shut down the instance when you aren't creating tables, the scheduler and function will start it up for your scheduled etl processes

Testing and troubleshooting

  • In your browser go to the ip address used to browse your data and create virtual tables
  • Create a new table
  • Add a S3 data source with the bucket uri https://s3.amazonaws.com/post-giraffe and the name Post giraffe
  • Pick the following columns
    • /Source Post Giraffe/Dir app-data/Data user-events.json/All keys/dateTime/Values Column String/As ISO Datetime/Column Datetime
    • /Source Post Giraffe/Dir app-data/Data user-events.json/All keys/dateTime/Values Column Number/As seconds since 170/Column Datetime
    • /Source Post Giraffe/Dir app-data/Data user-events.json/All keys/S/Key Column Boolean/As number/Column Number
    • /Source Post Giraffe/Dir app-data/Data user-events.json/All keys/MAS/Key Column Boolean/As number/Column Number
  • Merge the columns Values of key dateTime and Values of key dateTime 2
  • Name the virtual table Emails
  • Go to https://console.cloud.google.com/cloudscheduler and run the job slamdata-bigquery-start
  • Go to https://console.cloud.google.com/functions/list, click stop-then-start-instance-with-metadata, click View logs then click the play button which indicates Start streaming logs watch the logs until they stop and Function execution took appears
  • Go to https://console.cloud.google.com/compute/instances, click slamdata-bigquery, click Stackdriver Logging, then click the play button which indicates Start streaming logs and watch until they stop and Stopped Google Compute Engine Network Daemon. appears
  • Go to https://console.cloud.google.com/bigquery, select your project from the resource section, select your dataset, select the table, then click Preview

Installation

  • sudo apt-get install jq
  • curl https://gist.githubusercontent.com/beckyconning/fd4574cb6766d4c9680b10b0a81b396e/raw/sd-bq.sh > sd-bq.sh
  • curl https://gist.githubusercontent.com/beckyconning/fd4574cb6766d4c9680b10b0a81b396e/raw/sd-bq-single.sh > sd-bq-single.sh
  • chmod +x sd-bq.sh
  • chmod +x sd-bq-single.sh

Standalone usage

If you just want to use this tool without any other setup you may use the following. You don't need this section if you are doing the full setup in Google Cloud with scheduling.

  • Perform the steps in the installation section below
  • Go to https://console.cloud.google.com/iam-admin/serviceaccounts/create for your Google Cloud project and create a service user
  • Grant it the "BigQuery Admin" and "Storage Object Viewer" roles
  • Create a "JSON key type" key and download it
  • Replace $PATH_TO_JSON_KEY in export GOOGLE_APPLICATION_CREDENTIALS=$PATH_TO_JSON_KEY with the path to the JSON key you downloaded then run this command
  • Replace the $PROJECT_ID etc in the following ./sd-bq.sh $PROJECT_ID $BIG_QUERY_DATASET_ID $VIRTUAL_TABLE_SERVER_URL and run this command (e.g. ./sd-bq.sh example_project example_dataset http://slamdata.example.com)
const Buffer = require('safe-buffer').Buffer;
const Compute = require('@google-cloud/compute');
const reattempt = require('reattempt-promise-function');
const compute = new Compute();
require('es6-promise').polyfill();
exports.startInstancePubSub = (data, context) => {
const payload = _validatePayload(JSON.parse(Buffer.from(data.data, 'base64').toString()));
const vm = compute.zone(payload.zone).vm(payload.instance);
return setMetadata(vm, dekeyvalueize(keyvalueize(payload.metadata).map(predicated(hasKey("startup-script"))(appendValue(finale(resetMetaDataAndShutdown(payload.zone)(keyvalueize(payload.originalMetadata))))))))
.then(() => vm.stop())
.then(data => {
// Operation pending.
const operation = data[0];
return operation.promise();
})
.then(() => {
const message = 'Successfully stopped instance ' + payload.instance;
console.log(message);
return vm.start();
})
.then(data => {
// Operation pending.
const operation = data[0];
return operation.promise();
})
.then(() => {
const message = 'Successfully started instance ' + payload.instance;
console.log(message);
});
};
const setMetadata = (vm, metadata) =>
vm.setMetadata(metadata)
.then(data => {
const operation = data[0];
return operation.promise();
})
.then(() =>
reattempt(() => {
return vm.getMetadata().then(data => {
const actual = JSON.stringify(dekeyvalueize(data[0].metadata.items));
const expected = JSON.stringify(metadata);
if (actual === expected) {
return Promise.resolve(data);
} else {
return Promise.reject("Retreived metadata didn't equal " + JSON.stringify(metadata));
}
});
}, [], 1000, 20)
)
.then(data => {
console.log("Successfully set metadata " + JSON.stringify(metadata));
});
const gcloudFormatize = arr => Array.isArray(arr) ? arr.map(obj => obj.key + "=\"" + obj.value + "\"").join(",") : "";
const resetMetaDataAndShutdown = zone => metadata => "gcloud compute instances remove-metadata slamdata-bigquery --all --zone " + zone + (Array.isArray(metadata) && metadata.length > 0 ? " && gcloud compute instances add-metadata slamdata-bigquery --zone " + zone + " --metadata " + gcloudFormatize(metadata) : "") + " && gcloud compute instances stop slamdata-bigquery --zone " + zone;
const finale = s => " && (" + s + ") || (" + s + ")";
const predicated = p => f => x => p(x) ? f(x) : x;
const hasKey = key => obj => obj.key && obj.key === key;
const appendValue = suffix => obj => { return { key: obj.key, value: obj.value + suffix } };
const keyvalueize = obj => Object.keys(obj).map(key => { return { key: key, value: obj[key] }; });
const dekeyvalueize = arr => Array.isArray(arr) ? arr.reduce((acc, obj) => { return { ...acc, [obj.key]: obj.value }; }, {}) : {};
/**
* Validates that a request payload contains the expected fields.
*
* @param {!object} payload the request payload to validate.
* @returns {!object} the payload object.
*/
function _validatePayload (payload) {
if (!payload.zone) {
throw new Error(`Attribute 'zone' missing from payload`);
} else if (!payload.instance) {
throw new Error(`Attribute 'instance' missing from payload`);
} else if (!payload.metadata) {
throw new Error(`Attribute 'metadata' missing from payload`);
} else if (!payload.originalMetadata) {
throw new Error(`Attribute 'originalMetadata' missing from payload`);
}
return payload;
}
{
"name": "cloud-functions-schedule-instance",
"version": "0.0.1",
"private": true,
"license": "Apache-2.0",
"author": "Google Inc.",
"repository": {
"type": "git",
"url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
},
"engines": {
"node": ">=6.0"
},
"scripts": {
"lint": "semistandard '**/*.js'",
"pretest": "npm run lint",
"test": "ava -T 20s --verbose test/*.test.js"
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "^2.2.5",
"ava": "^0.25.0",
"proxyquire": "^2.0.0",
"sinon": "^4.4.2"
},
"dependencies": {
"@google-cloud/compute": "^0.10.0",
"safe-buffer": "5.1.2",
"reattempt-promise-function": "1.0.0",
"promise.prototype.finally": "3.1.0"
}
}
{
"zone": "<zone>",
"instance": "slamdata-bigquery",
"metadata": {
"startup-script": "/home/<username>/sd-bq.sh <project> <dataset> '{}' no_partitioning http://localhost"
},
"originalMetadata": {}
}
#!/bin/bash
set -e
set -o pipefail
ACCESS_TOKEN=$(gcloud auth application-default print-access-token)
DESTINATION_PROJECT_ID=$1
DESTINATION_DATASET_ID=$2
FLOAT64_COLUMN_NAMES=$3
PARTITIONING=$4
TABLE_URL=$5
echo "Getting virtual table name..."
TABLE_INFO=$(curl --fail "$TABLE_URL")
echo "Table info:"
echo "${TABLE_INFO}"
TABLE_NAME_ORIGINAL=$(echo "${TABLE_INFO}" | jq -r .name)
TABLE_NAME=$(echo "${TABLE_NAME_ORIGINAL}" | perl -pe 'chomp if eof' | perl -pe 's/[^A-Za-z0-9]/_/g')
echo "Virtual table name: ${TABLE_NAME}"
TABLE_COLUMNS=$(echo "${TABLE_INFO}" | jq "[.columns | .[] | .type = if (.column as \$column | ${FLOAT64_COLUMN_NAMES} | .\"${TABLE_NAME_ORIGINAL}\" | index(\$column)) then \"FLOAT64\" elif .type == \"offsetdatetime\" then \"TIMESTAMP\" elif .type == \"number\" then \"NUMERIC\" elif .type == \"string\" then \"STRING\" elif .type == \"boolean\" then \"BOOLEAN\" else null end | .[\"name\"] = (.column | gsub(\"[ ]\"; \"_\")) | del(.column)]")
echo "Virtual table columns: ${TABLE_COLUMNS}"
PARTITIONING_INFO=$(if [ "$PARTITIONING" = "day_partitioning" ]; then printf '"timePartitioning": { "type": "DAY" },'; else printf ''; fi)
WRITE_DISPOSITION=$(if [ "$PARTITIONING" = "day_partitioning" ]; then printf 'WRITE_APPEND'; else printf 'WRITE_TRUNCATE'; fi)
JOB_CONFIGURATION=\
"{\
\"configuration\": {\
\"load\": {\
\"sourceFormat\": \"CSV\",\
\"skipLeadingRows\": 1,\
\"allowQuotedNewlines\": true,\
\"schemaUpdateOptions\": [ \"ALLOW_FIELD_ADDITION\" ],\
\"schema\": {\
\"fields\": $TABLE_COLUMNS\
},$PARTITIONING_INFO\
\"writeDisposition\": \"$WRITE_DISPOSITION\",\
\"destinationTable\": {\
\"projectId\": \"$DESTINATION_PROJECT_ID\",\
\"datasetId\": \"$DESTINATION_DATASET_ID\",\
\"tableId\": \"$TABLE_NAME\"\
}\
}\
}\
}"
echo "Job configuration: ${JOB_CONFIGURATION}"
echo "Creating job..."
JOB_URL_PRIME=$(echo $JOB_CONFIGURATION | curl --fail -i -H "Authorization: Bearer $ACCESS_TOKEN" -H "Content-type: application/json" --data @- -X POST "https://www.googleapis.com/upload/bigquery/v2/projects/${DESTINATION_PROJECT_ID}/jobs?uploadType=resumable")
echo "Job url prime: ${JOB_URL_PRIME}"
JOB_URL=$(printf "${JOB_URL_PRIME}" | perl -n -e '/^Location: (.*)/i && print $1')
echo "Job URL: ${JOB_URL}"
echo "Streaming virtual table into Google BigQuery..."
curl --fail "$TABLE_URL/live/dataset" | curl --fail -X PUT --data-binary @- ${JOB_URL%$'\r'}
#!/bin/bash
set -e
set -o pipefail
SCRIPT_DIRECTORY=$(dirname "${BASH_SOURCE}")
DESTINATION_PROJECT_ID=$1
DESTINATION_DATASET_ID=$2
FLOAT64_COLUMN_NAMES=$3
PARTITIONING=$4
VIRTUAL_TABLES_URL="${5}/api/tables"
VIRTUAL_TABLE_URL_PREFIX="${5}/api/table/"
echo "Getting virtual table URLs"
VIRTUAL_TABLE_URLS=$(for i in {1..15}; do curl --fail "${VIRTUAL_TABLES_URL}" | jq -r 'to_entries | map(select(select(.value."name" | contains("[Archived] ") | not))) | from_entries | keys | .[]' | sed -e "s#^#${VIRTUAL_TABLE_URL_PREFIX}#" && break || sleep 5; done)
echo "$VIRTUAL_TABLE_URLS"
echo "Running process for each virtual table"
echo "$VIRTUAL_TABLE_URLS" | xargs -L1 $SCRIPT_DIRECTORY/sd-bq-single.sh $DESTINATION_PROJECT_ID $DESTINATION_DATASET_ID "$FLOAT64_COLUMN_NAMES" $PARTITIONING
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment