Created
January 10, 2024 09:43
-
-
Save mrsimpson/1d519299b8c171d7e228e8a3c7bc6ea3 to your computer and use it in GitHub Desktop.
Flink sorting with windows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package de.fermata.flink.app; | |
import java.time.Duration; | |
import java.util.stream.StreamSupport; | |
import org.apache.flink.api.common.eventtime.*; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; | |
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.util.Collector; | |
import org.apache.flink.util.OutputTag; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class EventSortJob { | |
private static final Duration ALLOWED_LATENESS = Duration.ofMillis(200); | |
private static final Duration SORT_WINDOW_SIZE = Duration.ofMillis(100); | |
private static final Logger LOGGER = LoggerFactory.getLogger(EventSortJob.class); | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
SingleOutputStreamOperator<Integer> source = | |
env.fromElements(0, 1, 2, 10, 9, 8, 3, 5, 4, 7, 6) | |
.assignTimestampsAndWatermarks( | |
new WatermarkStrategy<Integer>() { | |
@Override | |
public WatermarkGenerator<Integer> createWatermarkGenerator( | |
WatermarkGeneratorSupplier.Context context) { | |
return new WatermarkGenerator<Integer>() { | |
private long watermark = Long.MIN_VALUE; | |
// punctuated watermarks are used here for demonstration purposes only!!! | |
@Override | |
public void onEvent( | |
Integer event, long eventTimestamp, WatermarkOutput output) { | |
long potentialWatermark = | |
event | |
- ALLOWED_LATENESS | |
.toMillis(); // delay watermark behind latest timestamp | |
if (potentialWatermark > watermark) { | |
watermark = potentialWatermark; | |
output.emitWatermark(new Watermark(watermark)); | |
LOGGER.info("watermark = {}", watermark); | |
} | |
} | |
// normally, periodic watermarks should be used | |
@Override | |
public void onPeriodicEmit(WatermarkOutput output) {} | |
}; | |
} | |
@Override | |
public TimestampAssigner<Integer> createTimestampAssigner( | |
TimestampAssignerSupplier.Context context) { | |
return (element, recordTimestamp) -> | |
element; // for simplicity, element values are also timestamps (in millis) | |
} | |
}); | |
OutputTag<Integer> lateEventsTag = new OutputTag<Integer>("lateEventsTag") {}; | |
SingleOutputStreamOperator<Integer> sorted = | |
source | |
.keyBy(v -> 1) | |
.window( | |
TumblingProcessingTimeWindows.of(Time.milliseconds(SORT_WINDOW_SIZE.toMillis()))) | |
.sideOutputLateData(lateEventsTag) | |
.process( | |
new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() { | |
@Override | |
public void process( | |
Integer integer, | |
ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context, | |
Iterable<Integer> elements, | |
Collector<Integer> out) { | |
StreamSupport.stream(elements.spliterator(), false) | |
.sorted() | |
.forEachOrdered(out::collect); | |
} | |
}); | |
source.keyBy(v -> 1).map(v -> String.format("orig: %d", v)).addSink(new PrintSinkFunction<>()); | |
sorted.addSink(new PrintSinkFunction<>()); | |
sorted | |
.getSideOutput(lateEventsTag) | |
.keyBy(v -> 1) | |
.map(v -> String.format("late: %d", v)) | |
.addSink(new PrintSinkFunction<>()); | |
env.execute(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment