Created
September 1, 2022 12:46
-
-
Save fdmsantos/0f7d177e9cfba95f6d245f802d17e9a6 to your computer and use it in GitHub Desktop.
Java Flink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DataStreamJob { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
WatermarkStrategy<Tuple3<String, Integer, Long>> strategy = WatermarkStrategy | |
.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) | |
.withTimestampAssigner((event, timestamp) -> event.f2); | |
DataStream<Tuple2<String, Integer>> dataStream = env | |
.socketTextStream("localhost", 9999) | |
.flatMap(new Splitter()) | |
.assignTimestampsAndWatermarks(strategy) | |
.keyBy(value -> value.f0) | |
.window(TumblingProcessingTimeWindows.of(Time.seconds(40))) | |
.process(new MyProcessWindowFunction()); | |
dataStream.print(); | |
env.execute("Window WordCount"); | |
public static class Splitter implements FlatMapFunction<String, Tuple3<String, Integer, Long>> { | |
@Override | |
public void flatMap(String sentence, Collector<Tuple3<String, Integer, Long>> out) throws Exception { | |
String[] word = sentence.split(";"); | |
out.collect(new Tuple3<String, Integer, Long>(word[0], 1, Long.parseLong(word[1]))); | |
} | |
} | |
public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow> { | |
@Override | |
public void process(String s, ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception { | |
Integer sum = 0; | |
for (Tuple3<String, Integer, Long> in : elements) { | |
sum++; | |
} | |
out.collect(new Tuple2<String, Integer>(s, sum)); | |
Date date = new Date(context.window().getStart()); | |
Date date2 = new Date(context.window().getEnd()); | |
Date watermark = new Date(context.currentWatermark()); | |
Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss"); | |
System.out.println("Watermark: " + context.currentWatermark()); | |
System.out.println("Current Timestamp: " + Instant.now().toString() + " Start Window: " + format.format(date) + " End Window: " + format.format(date2)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment