Skip to content

Instantly share code, notes, and snippets.

@fdmsantos
Created September 1, 2022 12:46
Show Gist options
  • Save fdmsantos/0f7d177e9cfba95f6d245f802d17e9a6 to your computer and use it in GitHub Desktop.
Save fdmsantos/0f7d177e9cfba95f6d245f802d17e9a6 to your computer and use it in GitHub Desktop.
Java Flink
public class DataStreamJob {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WatermarkStrategy<Tuple3<String, Integer, Long>> strategy = WatermarkStrategy
.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f2);
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.assignTimestampsAndWatermarks(strategy)
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(40)))
.process(new MyProcessWindowFunction());
dataStream.print();
env.execute("Window WordCount");
public static class Splitter implements FlatMapFunction<String, Tuple3<String, Integer, Long>> {
@Override
public void flatMap(String sentence, Collector<Tuple3<String, Integer, Long>> out) throws Exception {
String[] word = sentence.split(";");
out.collect(new Tuple3<String, Integer, Long>(word[0], 1, Long.parseLong(word[1])));
}
}
public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow> {
@Override
public void process(String s, ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer sum = 0;
for (Tuple3<String, Integer, Long> in : elements) {
sum++;
}
out.collect(new Tuple2<String, Integer>(s, sum));
Date date = new Date(context.window().getStart());
Date date2 = new Date(context.window().getEnd());
Date watermark = new Date(context.currentWatermark());
Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss");
System.out.println("Watermark: " + context.currentWatermark());
System.out.println("Current Timestamp: " + Instant.now().toString() + " Start Window: " + format.format(date) + " End Window: " + format.format(date2));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment