Skip to content

Instantly share code, notes, and snippets.

@brachi-wernick
Last active February 10, 2019 15:10
Show Gist options
  • Save brachi-wernick/1bdf74149c00d6a72b357977169acb6b to your computer and use it in GitHub Desktop.
Save brachi-wernick/1bdf74149c00d6a72b357977169acb6b to your computer and use it in GitHub Desktop.
pipline with tags, process error
EventsProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(EventsProcessingOptions.class);
Pipeline p = Pipeline.create(options);
PCollectionTuple tableRows =
// read kafka topic
p.apply("kafka-topic-read", kafkaReader)
.apply("kafka-values", MapElements.into(TypeDescriptors.strings())
.via(record ->record.getKV().getValue()))
// convert value to JsonNode
.apply("string-to-json", ParseJsons.of(JsonNode.class))
// create TableRow
.apply("Build-table-row", ParDo.of(new EventsRowFn()).withOutputTags(MAIN_OUT, TupleTagList.of(DEADLETTER_OUT)));
// save the MAIN tag to BQ
tableRows
.get(MAIN_OUT)
.apply("BQ-write", BigQueryIO.<TableRowWithEvent>write()
.to(tableSpec)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
// save the DEADLETTER_OUT to BQ error table
tableRows
.get(DEADLETTER_OUT)
.apply("BQ-process-error-extract", ParDo.of(new BigQueryProcessErrorExtracFn()))
.apply("BQ-process-error-write", BigQueryIO.writeTableRows()
.to(errTableSpec)
.withJsonSchema(errSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment