This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@ProcessElement | |
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) { | |
public static final TupleTag<JsonNode> MAIN_OUT = new TupleTag<JsonNode>() {}; | |
public static final TupleTag<BigQueryProcessError> DEADLETTER_OUT = new TupleTag<BigQueryProcessError>() {}; | |
TableRow convertedRow = new TableRow(); | |
try { | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EventsProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() | |
.as(EventsProcessingOptions.class); | |
Pipeline p = Pipeline.create(options); | |
PCollectionTuple tableRows = | |
// read kafka topic | |
p.apply("kafka-topic-read", kafkaReader) | |
.apply("kafka-values", MapElements.into(TypeDescriptors.strings()) | |
.via(record ->record.getKV().getValue())) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
WriteResult writeResult = tableRowToInsertCollection | |
.apply("BQ-write", BigQueryIO.write() | |
// specify that failed rows will be returned with their error | |
.withExtendedErrorInfo() | |
.to(tableSpec) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) | |
//Specfies a policy for handling failed inserts. | |
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FieldsConfigurationProvider { | |
private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class); | |
private Long lastUpdateTime; | |
private FieldsConfiguration config; | |
private static FieldsConfigurationProvider instance = null; | |
public static String bucketName; | |
public static String filePath; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> { | |
private FieldsConfigurationProvider fieldsConfigurationProvider; | |
public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) { | |
this.filedConfigurationBucketName = filedConfigurationBucketName; | |
this.filedConfigurationFilePath = filedConfigurationFilePath; | |
this.filedConfigurationCheckInterval = filedConfigurationCheckInterval; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PCollection<Long> ticks = p | |
// Produce 1 "tick" per second | |
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1))) | |
// Window the ticks into 1-minute windows | |
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) | |
// Use an arbitrary per-window combiner to reduce to 1 element per window | |
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults()); | |
String bucketName = options.getBigQuerySchemaConfigBucketName().get(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) | |
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) | |
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) |