Skip to content

Instantly share code, notes, and snippets.

@mrsimpson
Created January 10, 2024 09:43
Show Gist options
  • Save mrsimpson/1d519299b8c171d7e228e8a3c7bc6ea3 to your computer and use it in GitHub Desktop.
Save mrsimpson/1d519299b8c171d7e228e8a3c7bc6ea3 to your computer and use it in GitHub Desktop.
Flink sorting with windows
package de.fermata.flink.app;
import java.time.Duration;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventSortJob {
private static final Duration ALLOWED_LATENESS = Duration.ofMillis(200);
private static final Duration SORT_WINDOW_SIZE = Duration.ofMillis(100);
private static final Logger LOGGER = LoggerFactory.getLogger(EventSortJob.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> source =
env.fromElements(0, 1, 2, 10, 9, 8, 3, 5, 4, 7, 6)
.assignTimestampsAndWatermarks(
new WatermarkStrategy<Integer>() {
@Override
public WatermarkGenerator<Integer> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Integer>() {
private long watermark = Long.MIN_VALUE;
// punctuated watermarks are used here for demonstration purposes only!!!
@Override
public void onEvent(
Integer event, long eventTimestamp, WatermarkOutput output) {
long potentialWatermark =
event
- ALLOWED_LATENESS
.toMillis(); // delay watermark behind latest timestamp
if (potentialWatermark > watermark) {
watermark = potentialWatermark;
output.emitWatermark(new Watermark(watermark));
LOGGER.info("watermark = {}", watermark);
}
}
// normally, periodic watermarks should be used
@Override
public void onPeriodicEmit(WatermarkOutput output) {}
};
}
@Override
public TimestampAssigner<Integer> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) ->
element; // for simplicity, element values are also timestamps (in millis)
}
});
OutputTag<Integer> lateEventsTag = new OutputTag<Integer>("lateEventsTag") {};
SingleOutputStreamOperator<Integer> sorted =
source
.keyBy(v -> 1)
.window(
TumblingProcessingTimeWindows.of(Time.milliseconds(SORT_WINDOW_SIZE.toMillis())))
.sideOutputLateData(lateEventsTag)
.process(
new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
@Override
public void process(
Integer integer,
ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context,
Iterable<Integer> elements,
Collector<Integer> out) {
StreamSupport.stream(elements.spliterator(), false)
.sorted()
.forEachOrdered(out::collect);
}
});
source.keyBy(v -> 1).map(v -> String.format("orig: %d", v)).addSink(new PrintSinkFunction<>());
sorted.addSink(new PrintSinkFunction<>());
sorted
.getSideOutput(lateEventsTag)
.keyBy(v -> 1)
.map(v -> String.format("late: %d", v))
.addSink(new PrintSinkFunction<>());
env.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment