Skip to content

Instantly share code, notes, and snippets.

View brachi-wernick's full-sized avatar

Brachi Packter brachi-wernick

  • moonactive
  • Israel
View GitHub Profile
@brachi-wernick
brachi-wernick / FieldsConfigurationProvider.java
Last active March 5, 2019 11:00
Field Configuration loading
public class FieldsConfigurationProvider {
private Logger logger = LoggerFactory.getLogger(FieldsConfigurationProvider.class);
private Long lastUpdateTime;
private FieldsConfiguration config;
private static FieldsConfigurationProvider instance = null;
public static String bucketName;
public static String filePath;
@brachi-wernick
brachi-wernick / BQInsertErrorHandling.java
Last active February 5, 2020 12:07
error handling BQ insert
WriteResult writeResult = tableRowToInsertCollection
.apply("BQ-write", BigQueryIO.write()
// specify that failed rows will be returned with their error
.withExtendedErrorInfo()
.to(tableSpec)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
//Specfies a policy for handling failed inserts.
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
@brachi-wernick
brachi-wernick / PipelineWithTagForError.java
Last active February 10, 2019 15:10
pipline with tags, process error
EventsProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(EventsProcessingOptions.class);
Pipeline p = Pipeline.create(options);
PCollectionTuple tableRows =
// read kafka topic
p.apply("kafka-topic-read", kafkaReader)
.apply("kafka-values", MapElements.into(TypeDescriptors.strings())
.via(record ->record.getKV().getValue()))
@brachi-wernick
brachi-wernick / ParDoWithTAg.java
Last active February 10, 2019 15:10
ParDO with a tag
@ProcessElement
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) {
public static final TupleTag<JsonNode> MAIN_OUT = new TupleTag<JsonNode>() {};
public static final TupleTag<BigQueryProcessError> DEADLETTER_OUT = new TupleTag<BigQueryProcessError>() {};
TableRow convertedRow = new TableRow();
try {
@brachi-wernick
brachi-wernick / ParDoSimpleTransform.java
Last active February 6, 2019 21:13
simple transformation
@ProcessElement
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) {
TableRow convertedRow = new TableRow();
insertLong(element.get("server_time"), "server_time", convertedRow);
insertFloat(element.get("screen_dpi"), "screen_dpi", convertedRow);
// more transformation to come
@brachi-wernick
brachi-wernick / SimplePipline.java
Last active February 5, 2020 11:58
simple pipline
p.apply("BQ-write", BigQueryIO.write()
.to(tableSpec)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
@brachi-wernick
brachi-wernick / SideInputWithTimeWindow.java
Created February 3, 2019 12:41
Side Input with time wiondows
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
String bucketName = options.getBigQuerySchemaConfigBucketName().get();
@brachi-wernick
brachi-wernick / kafkaRabbit.csv
Last active December 2, 2018 10:53
kafka va rabbit
Kafka RabbitMq
Distributed Consumer v (consumer groups) v(compete consumers)
Distributed Producer v v
Distributed queues v Topic partitions x
Clustering system v Also zookeeper x
Replicated queue V By default V By queue configuration
Messages persistence V by retention period X persistence guarantees aren't strong
@brachi-wernick
brachi-wernick / buildRecordWithSchema.java
Created November 20, 2018 20:21
buildRecordWithSchema.java
private static GenericRecord buildRecord(LoginAttackCount loginAttackCount) throws IOException {
// avro schema avsc file path.
String schemaPath = "src/main/java/loginStream/login-attack-count.avsc";
String schemaString;
try (FileInputStream inputStream = new FileInputStream(schemaPath)) {
schemaString = org.apache.commons.io.IOUtils.toString(inputStream);
} catch (Exception e) {
throw new RuntimeException(e);
@brachi-wernick
brachi-wernick / buildRecord.java
Created November 20, 2018 20:20
buildRecord.java
counts.mapValues((windowed, counter) ->
{
try {
return buildRecord(new LoginAttackCount(
windowed.key(),
counter,
windowed.window().start(),
windowed.window().end()));
} catch (IOException e) {
throw new RuntimeException(e);