#!/usr/bin/env bash
PROJECT="<project-id>"
### Sinks ###
#Base sink for all logs to BigQuery
gcloud logging sinks create --quiet --project $project ${project}_bigquery_sink bigquery.googleapis.com/projects/billing-exports-184901/datasets/CLOUD_LOGGING_EXPORTS --log-filter "resource.type:*"
#### Metrics ####
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Dataflow streaming pipeline to read tweets from PubSub topic and write the payload to BigQuery | |
*/ | |
public class TweetPipeline { | |
private static final String TOPIC = "projects/grey-sort-challenge/topics/twitter"; | |
private static final String BIGQUERY_DESTINATION = "%s:twitter.tweets"; | |
public static void main(String[] args) { | |
PipelineOptionsFactory.register(DataflowPipelineOptions.class); | |
DataflowPipelineOptions options = PipelineOptionsFactory |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
runtime: custom | |
api_version: '1.0' | |
env: flexible | |
threadsafe: true | |
automatic_scaling: | |
min_num_instances: 1 | |
max_num_instances: 2 | |
cpu_utilization: | |
target_utilization: 0.5 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:2 | |
RUN pip install --upgrade pip | |
RUN pip install tweepy | |
RUN pip install --upgrade google-api-python-client | |
RUN pip install --upgrade oauth2client | |
RUN pip install python-dateutil | |
ADD twitter-to-pubsub.py /twitter-to-pubsub.py | |
ADD utils.py /utils.py |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This script uses the Twitter Streaming API, via the tweepy library, | |
to pull in tweets and publish them to a PubSub topic. | |
""" | |
import base64 | |
import datetime | |
import utils | |
from tweepy import OAuthHandler | |
from tweepy import Stream | |
from tweepy.streaming import StreamListener |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
terraform { | |
backend "gcs" { | |
bucket = "tf-state-gcp-batch-ingestion" | |
region = "australia-southeast1-a" | |
prefix = "terraform/state" | |
} | |
} | |
provider "google" { | |
project = "grey-sort-challenge" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.polleyg; | |
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; | |
import org.apache.beam.sdk.options.Description; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//gcloud --project=grey-sort-challenge functions deploy goWithTheDataFlow --stage-bucket gs://batch-pipeline --trigger-bucket gs://batch-pipeline | |
const google = require('googleapis'); | |
exports.goWithTheDataFlow = function(event, callback) { | |
const file = event.data; | |
const context = event.context; | |
console.log("File is: ", file); | |
console.log("State is: ", context.eventType); | |
if (context.eventType === 'google.storage.object.finalize' && file.name.indexOf('upload/') !== -1) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
steps: | |
# 1. Fetch the source code | |
- name: gcr.io/cloud-builders/git | |
args: ['clone', 'https://github.com/polleyg/gcp-batch-ingestion-bigquery.git'] | |
# 2a. Set up GCS & BQ etc. using public terraform Docker image | |
- name: hashicorp/terraform | |
args: ['init'] | |
dir: 'terraform' |
Cloud Functions are an extremely powerful and elegant way to solve problems. You don't need to worry about any of the infrastructure, nor trying to scale. It's a win-win! However, when deploying[1] your Cloud Function to trigger with GCS, you can currently only set the --trigger-bucket
parameter to be an actual GCS bucket.
But what if you only want to trigger on a files in a specific folder(s) within that bucket?
Not to fret! There's a little trick for this. In fact, the object/file name (i.e. the new file that's been uploaded to your folder) actually contains the full path, including any folder names. So, in your Cloud Function all you need to do is this:
function(event, callback) {
const file = event.data;
if (file.resourceState === 'exists' && file.name && file.name.indexOf('my_lovely_folder/') !== -1) {