This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MyFn extends DoFn<String, String> { | |
private static RedissonClient redisClient; | |
private final static Object lock = new Object(); | |
private String redisHost; | |
private int redisPort; | |
MyFn(String redisHost, Integer redisPort) { | |
this.redisHost = redisHost; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@ProcessElement | |
public void processElement(@Element JsonNode jsonNode, ProcessContext context, PipelineOptions options) { | |
FieldConfigurationOptions fieldConfigurationOptions = options.as(FieldConfigurationOptions.class); | |
FieldConfigurationLoader fieldsConfigurationProvider = fieldConfigurationOptions.getFieldsConfigurationProvider(); | |
FieldsConfiguration config = fieldsConfigurationProvider.getConfig(); | |
TableRow tableRow = new TableRow(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void watch() { | |
new Thread(() -> { | |
Storage storage = StorageOptions.getDefaultInstance().getService(); | |
Bucket bucket = storage.get(bucketName); | |
while(true) { | |
try { | |
Blob blob = bucket.get(filePath); | |
long currentTime = blob.getUpdateTime(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FieldsConfigurationProvider{ | |
private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class); | |
private Long lastUpdateTime; | |
private FieldsConfiguration config; | |
private String bucketName; | |
private String filePath; | |
private long fieldsConfigLoadInterval; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) | |
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) | |
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PCollection<Long> ticks = p | |
// Produce 1 "tick" per second | |
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1))) | |
// Window the ticks into 1-minute windows | |
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) | |
// Use an arbitrary per-window combiner to reduce to 1 element per window | |
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults()); | |
String bucketName = options.getBigQuerySchemaConfigBucketName().get(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> { | |
private FieldsConfigurationProvider fieldsConfigurationProvider; | |
public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) { | |
this.filedConfigurationBucketName = filedConfigurationBucketName; | |
this.filedConfigurationFilePath = filedConfigurationFilePath; | |
this.filedConfigurationCheckInterval = filedConfigurationCheckInterval; | |
} |