Last active
February 10, 2019 15:10
-
-
Save brachi-wernick/1bdf74149c00d6a72b357977169acb6b to your computer and use it in GitHub Desktop.
pipline with tags, process error
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EventsProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() | |
.as(EventsProcessingOptions.class); | |
Pipeline p = Pipeline.create(options); | |
PCollectionTuple tableRows = | |
// read kafka topic | |
p.apply("kafka-topic-read", kafkaReader) | |
.apply("kafka-values", MapElements.into(TypeDescriptors.strings()) | |
.via(record ->record.getKV().getValue())) | |
// convert value to JsonNode | |
.apply("string-to-json", ParseJsons.of(JsonNode.class)) | |
// create TableRow | |
.apply("Build-table-row", ParDo.of(new EventsRowFn()).withOutputTags(MAIN_OUT, TupleTagList.of(DEADLETTER_OUT))); | |
// save the MAIN tag to BQ | |
tableRows | |
.get(MAIN_OUT) | |
.apply("BQ-write", BigQueryIO.<TableRowWithEvent>write() | |
.to(tableSpec) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); | |
// save the DEADLETTER_OUT to BQ error table | |
tableRows | |
.get(DEADLETTER_OUT) | |
.apply("BQ-process-error-extract", ParDo.of(new BigQueryProcessErrorExtracFn())) | |
.apply("BQ-process-error-write", BigQueryIO.writeTableRows() | |
.to(errTableSpec) | |
.withJsonSchema(errSchema) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); | |
p.run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment