Skip to content

Instantly share code, notes, and snippets.

@polleyg
Created September 6, 2018 12:11
Show Gist options
  • Save polleyg/9a1c4084144d1bc801fb8725e564805c to your computer and use it in GitHub Desktop.
Save polleyg/9a1c4084144d1bc801fb8725e564805c to your computer and use it in GitHub Desktop.
Cloud Function for triggering templated Dataflow pipelines
//gcloud --project=grey-sort-challenge functions deploy goWithTheDataFlow --stage-bucket gs://batch-pipeline --trigger-bucket gs://batch-pipeline
const google = require('googleapis');
exports.goWithTheDataFlow = function(event, callback) {
const file = event.data;
const context = event.context;
console.log("File is: ", file);
console.log("State is: ", context.eventType);
if (context.eventType === 'google.storage.object.finalize' && file.name.indexOf('upload/') !== -1) {
google.auth.getApplicationDefault(function (err, authClient) {
if (err) {
throw err;
}
// See https://cloud.google.com/compute/docs/authentication for more information on scopes
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
// Scopes can be specified either as an array or as a single, space-delimited string.
authClient = authClient.createScoped([
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email'
]);
}
google.auth.getDefaultProjectId(function(err, projectId) {
if (err || !projectId) {
console.error(`Problems getting projectId (${projectId}). Err was: `, err);
throw err;
}
const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {
inputFile: `gs://${file.bucket}/${file.name}`
},
jobName: 'called-from-a-cloud-function-batch-pipeline-' + new Date().getTime(),
gcsPath: 'gs://batch-pipeline/template/pipeline'
}
}, function(err, response) {
if (err) {
console.error("Problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
callback();
});
});
});
} else {
console.log("Nothing to do here, ignoring.");
callback();
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment