Skip to content

Instantly share code, notes, and snippets.

View brachi-wernick's full-sized avatar

Brachi Packter brachi-wernick

  • moonactive
  • Israel
View GitHub Profile
@brachi-wernick
brachi-wernick / MyFn.java
Last active February 7, 2020 11:58
MyFn
public class MyFn extends DoFn<String, String> {
private static RedissonClient redisClient;
private final static Object lock = new Object();
private String redisHost;
private int redisPort;
MyFn(String redisHost, Integer redisPort) {
this.redisHost = redisHost;
@brachi-wernick
brachi-wernick / DoFn.java
Created January 10, 2020 11:18
Using field config in the DOFN
@ProcessElement
public void processElement(@Element JsonNode jsonNode, ProcessContext context, PipelineOptions options) {
FieldConfigurationOptions fieldConfigurationOptions = options.as(FieldConfigurationOptions.class);
FieldConfigurationLoader fieldsConfigurationProvider = fieldConfigurationOptions.getFieldsConfigurationProvider();
FieldsConfiguration config = fieldsConfigurationProvider.getConfig();
TableRow tableRow = new TableRow();
@brachi-wernick
brachi-wernick / FieldsConfigurationProvider.java
Last active January 10, 2020 11:06
FieldsConfigurationProvider
private void watch() {
new Thread(() -> {
Storage storage = StorageOptions.getDefaultInstance().getService();
Bucket bucket = storage.get(bucketName);
while(true) {
try {
Blob blob = bucket.get(filePath);
long currentTime = blob.getUpdateTime();
@brachi-wernick
brachi-wernick / FieldsConfigurationProvider.java
Created January 10, 2020 11:01
FieldsConfigurationProvider
public class FieldsConfigurationProvider{
private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
private Long lastUpdateTime;
private FieldsConfiguration config;
private String bucketName;
private String filePath;
private long fieldsConfigLoadInterval;
@brachi-wernick
brachi-wernick / IterateWithSideInput.java
Last active March 6, 2019 16:07
IterateWithSideInput
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields())
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields())
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));
@brachi-wernick
brachi-wernick / UseSideInput.java
Created March 6, 2019 16:05
USing side input
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));
@brachi-wernick
brachi-wernick / SideInput.java
Created March 6, 2019 16:04
SideInput load config
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
String bucketName = options.getBigQuerySchemaConfigBucketName().get();
@brachi-wernick
brachi-wernick / EventToTableRowTransformerFn.java
Last active March 6, 2019 13:06
EventToTableRowTransformerFn
public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> {
private FieldsConfigurationProvider fieldsConfigurationProvider;
public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) {
this.filedConfigurationBucketName = filedConfigurationBucketName;
this.filedConfigurationFilePath = filedConfigurationFilePath;
this.filedConfigurationCheckInterval = filedConfigurationCheckInterval;
}