Skip to content

Instantly share code, notes, and snippets.

@therako
Last active March 4, 2018 09:42
Show Gist options
  • Save therako/9e8157c7d495a6c0b4a3b834bbb75eda to your computer and use it in GitHub Desktop.
Save therako/9e8157c7d495a6c0b4a3b834bbb75eda to your computer and use it in GitHub Desktop.
public class DeadLetterPatternExample {
public static void main(String args[]) {
DeadLetterPatternPipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DeadLetterPatternPipelineOptions.class);
final Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(pipelineOptions);
PCollectionTuple parseMsg = pipeline
.apply("PubSubReader",
PubsubIO.readMessages().fromSubscription(pipelineOptions.getPubSubSubscriber()))
.apply("ParseMsg",
ParDo.of(new MsgParser())
.withOutputTags(MsgParser.SuccessfulParse, TupleTagList.of(DeadLetterHandler.DeadLetterTag)));
// Happy path - Write to BQ
parseMsg
.get(MsgParser.SuccessfulParse)
.setCoder(TableRowJsonCoder.of())
.apply("WriteToBQ",
BigQueryIO.writeTableRows().to(pipelineOptions.getBqTable())
.withSchema(MsgParser.getTableSchema()));
PCollection<TableRow> deadLetterErrorPCollection = parseMsg
.get(DeadLetterHandler.DeadLetterTag)
.setCoder(AvroCoder.of(DeadLetterHandler.DeadLetterError.class))
.apply("BuildErrorRecord", ParDo.of(new DeadLetterHandler.BuildErrorRecord()));
// Errors - Write raw data to BQ along with error details
deadLetterErrorPCollection
.apply("WriteErrorsToBQ",
BigQueryIO.writeTableRows().to(pipelineOptions.getBqErrorsTable())
.withSchema(DeadLetterHandler.DeadLetterError.getTableSchema()));
// Errors - Notify via slack on high error rates.
deadLetterErrorPCollection
.apply("ErrorsCounterWindow",
Window.into(FixedWindows.of(Duration.standardSeconds(pipelineOptions.getErrorRateWindow()))))
.apply("CountErrors", Combine.globally(new CountFn<TableRow>()).withoutDefaults())
.setCoder(BigEndianLongCoder.of())
.apply("NotifySlackOnHighErrorRate",
ParDo.of(new DeadLetterHandler.NotifySlack(
pipelineOptions.getMaxErrorRate(),
pipelineOptions.getSlackKey(),
pipelineOptions.getSlackChannel(),
pipelineOptions.getJobName())));
pipeline.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment