Last active
August 14, 2017 11:38
-
-
Save spark2ignite/a833f04c98d38c11a7398967835c9317 to your computer and use it in GitHub Desktop.
How we saved over $240K per year by replacing Mixpanel with Google BigQuery, Pub/Sub, Dataflow & Kubernetes (code snippet #3)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jellybtn; | |
import com.google.api.services.bigquery.model.TableReference; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import com.google.cloud.dataflow.sdk.Pipeline; | |
import com.google.cloud.dataflow.sdk.io.BigQueryIO; | |
import com.google.cloud.dataflow.sdk.io.PubsubIO; | |
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; | |
import com.google.cloud.dataflow.sdk.transforms.Filter; | |
import com.google.cloud.dataflow.sdk.transforms.Flatten; | |
import com.google.cloud.dataflow.sdk.transforms.MapElements; | |
import com.google.cloud.dataflow.sdk.transforms.ParDo; | |
import com.google.cloud.dataflow.sdk.values.*; | |
import org.json.simple.parser.ParseException; | |
import java.io.IOException; | |
public class JellybtnIngestFlow { | |
private static final TupleTag < TableRow > outputTag = new TupleTag < TableRow > () {}; | |
private static final TupleTag < TableRow > errorsTag = new TupleTag < TableRow > () {}; | |
private static PCollection < TableRow > handleEvents(Pipeline pipeline, JellybtnIngestFlowOptions options) throws IOException, ParseException { | |
PCollection < String > rawEvents = pipeline.apply("Read Events PubSub Messages", | |
PubsubIO.Read | |
.subscription(options.getEventsSubscriptionPath()) | |
); | |
PCollectionTuple mappedEvents = rawEvents.apply("Map Events", ParDo | |
.withOutputTags(outputTag, TupleTagList.of(errorsTag)) | |
.of(new MapEvents(errorsTag))); | |
PCollection < TableRow > events = mappedEvents.get(outputTag); | |
writeToTable(events, | |
TableSchemas.people(), | |
new TableReference().setProjectId(options.getBQProjectID()).setTableId(options.getEventsTableName()), | |
options.getDatasets(), options.getDatasetSuffix()); | |
return mappedEvents.get(errorsTag); | |
} | |
private static PCollection < TableRow > handlePeople(Pipeline pipeline, JellybtnIngestFlowOptions options) throws IOException, ParseException { | |
PCollection < String > rawPeople = pipeline.apply("Read People PubSub Messages", | |
PubsubIO.Read | |
.subscription(options.getPeopleSubscriptionPath()) | |
); | |
PCollectionTuple mappedPeople = rawPeople.apply("Map people", ParDo | |
.withOutputTags(outputTag, TupleTagList.of(errorsTag)) | |
.of(new MapPeople(errorsTag))); | |
PCollection < TableRow > people = mappedPeople.get(outputTag); | |
writeToTable(people, | |
TableSchemas.people(), | |
new TableReference().setProjectId(options.getBQProjectID()).setTableId(options.getPeopleTableName()), | |
options.getDatasets(), options.getDatasetSuffix()); | |
return mappedPeople.get(errorsTag); | |
} | |
private static void writeToTable(PCollection < TableRow > rows, TableSchema tableSchema, TableReference tableRef, String[] datasets, String dataSetSuffix) throws IOException { | |
for (String dataset: datasets) { | |
tableRef.setDatasetId(dataset + dataSetSuffix); | |
PCollection < TableRow > datasetRows = rows.apply("Filter for " + tableRef.toPrettyString() + " table", Filter.byPredicate((TableRow row) - > dataset.equals(row.get("dataset")))); | |
datasetRows = datasetRows.apply("Remap for " + tableRef.toPrettyString() + " table", MapElements.via((TableRow row) - > { | |
row.remove("dataset"); | |
return row; | |
}).withOutputType(TypeDescriptor.of(TableRow.class))); | |
datasetRows.apply("Write BigQuery " + tableRef.toPrettyString() + " table", | |
BigQueryIO.Write | |
.to(tableRef) | |
.withSchema(tableSchema) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) | |
); | |
} | |
} | |
public static void main(String[] args) throws IOException, ParseException { | |
PipelineOptionsFactory.register(JellybtnIngestFlowOptions.class); | |
JellybtnIngestFlowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JellybtnIngestFlowOptions.class); | |
options.setStreaming(true); | |
Pipeline pipeline = Pipeline.create(options); | |
PCollection < TableRow > errorEvents = handleEvents(pipeline, options); | |
PCollection < TableRow > errorPeople = handlePeople(pipeline, options); | |
PCollectionList < TableRow > joinedErrors = PCollectionList.of(errorEvents).and(errorPeople); | |
PCollection < TableRow > errors = joinedErrors.apply("Join errors", Flatten.pCollections()); | |
writeToTable(errors, | |
TableSchemas.errors(), | |
new TableReference().setProjectId(options.getBQProjectID()).setTableId(options.getErrorsTableName()), | |
options.getDatasets(), options.getDatasetSuffix()); | |
pipeline.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment