Skip to content

Instantly share code, notes, and snippets.

View brachi-wernick's full-sized avatar

Brachi Packter brachi-wernick

  • moonactive
  • Israel
View GitHub Profile
@brachi-wernick
brachi-wernick / ParDoWithTAg.java
Last active February 10, 2019 15:10
ParDO with a tag
@ProcessElement
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) {
public static final TupleTag<JsonNode> MAIN_OUT = new TupleTag<JsonNode>() {};
public static final TupleTag<BigQueryProcessError> DEADLETTER_OUT = new TupleTag<BigQueryProcessError>() {};
TableRow convertedRow = new TableRow();
try {
@brachi-wernick
brachi-wernick / PipelineWithTagForError.java
Last active February 10, 2019 15:10
pipline with tags, process error
EventsProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(EventsProcessingOptions.class);
Pipeline p = Pipeline.create(options);
PCollectionTuple tableRows =
// read kafka topic
p.apply("kafka-topic-read", kafkaReader)
.apply("kafka-values", MapElements.into(TypeDescriptors.strings())
.via(record ->record.getKV().getValue()))
@brachi-wernick
brachi-wernick / BQInsertErrorHandling.java
Last active February 5, 2020 12:07
error handling BQ insert
WriteResult writeResult = tableRowToInsertCollection
.apply("BQ-write", BigQueryIO.write()
// specify that failed rows will be returned with their error
.withExtendedErrorInfo()
.to(tableSpec)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
//Specfies a policy for handling failed inserts.
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
@brachi-wernick
brachi-wernick / FieldsConfigurationProvider.java
Last active March 5, 2019 11:00
Field Configuration loading
public class FieldsConfigurationProvider {
private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
private Long lastUpdateTime;
private FieldsConfiguration config;
private static FieldsConfigurationProvider instance = null;
public static String bucketName;
public static String filePath;
@brachi-wernick
brachi-wernick / EventToTableRowTransformerFn.java
Last active March 6, 2019 13:06
EventToTableRowTransformerFn
public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> {
private FieldsConfigurationProvider fieldsConfigurationProvider;
public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) {
this.filedConfigurationBucketName = filedConfigurationBucketName;
this.filedConfigurationFilePath = filedConfigurationFilePath;
this.filedConfigurationCheckInterval = filedConfigurationCheckInterval;
}
@brachi-wernick
brachi-wernick / SideInput.java
Created March 6, 2019 16:04
SideInput load config
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
String bucketName = options.getBigQuerySchemaConfigBucketName().get();
@brachi-wernick
brachi-wernick / UseSideInput.java
Created March 6, 2019 16:05
USing side input
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields())
@brachi-wernick
brachi-wernick / IterateWithSideInput.java
Last active March 6, 2019 16:07
IterateWithSideInput
for (FieldConfig fieldConfig : context.sideInput(sideView).getFields())