Skip to content

Instantly share code, notes, and snippets.

Last active August 12, 2019 04:03
Show Gist options
  • Save Sam-Serpoosh/194068bd4e9fea9958bfae1cf618597b to your computer and use it in GitHub Desktop.
Save Sam-Serpoosh/194068bd4e9fea9958bfae1cf618597b to your computer and use it in GitHub Desktop.
Testing TumblingEventTimeWindow of Flink
package com.uber.gairos.flink.operator.flink_operator;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
* Expectations and explanations of these tests are laid out based
* on a conversation which you can find in the following StackOverflow
* question:
* -
* A few points about the layout of the events in the test examples'
* descriptions:
* - Only the EVENT TIME of the arriving events are laid out for brevity
* - The relative LEFT/RIGHT placement of the events along the X-axis
* indicates their ARRIVAL time with regard to WALL-CLOCK or
* Also, in the assertions you'll notice the outcome/aggregated result
* of the LAST window is verified; EVEN THOUGH, technically the result
* of the LAST window should NOT be triggered due to the state of:
* - End Of Window (EOW)
* - Current Watermark (CW)
* I assume this happens due to Flink FLUSHING the remainder of the
* job's content/state/window once its execution is wrapped up in
* the test.
public class TumblingWindowAggregationBehaviorTest {
private final static String AGG_ID = "item_aggregator";
private final static String CHI = "Chicago";
private final static Time WIN_SIZE = Time.minutes(5);
// ALL the following times belong to Wednesday July 24th 2019 UTC
private final static Long UTC_801_AM = 1563955260000L;
private final static Long UTC_802_AM = 1563955320000L;
private final static Long UTC_804_AM = 1563955440000L;
private final static Long UTC_806_AM = 1563955560000L;
private final static Long UTC_808_AM = 1563955680000L;
private StreamExecutionEnvironment execEnv;
private ExecutionConfig execCfg;
* We invoke setAutoWatermarkInterval with the value ZERO
* cause we want the emission of a watermark after EVERY EVENT
* as opposed to the normal behavior which emits watermarks
* periodically when dealing with Out-Of-Orderness assigners.
* This is NOT efficient in production, but we use it for
* our testing purposes to ensure watermarks are being
* emitted after each event and the stream is advancing in
* the eyes of Flink.
public void setupExecEnv() {
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execCfg = execEnv.getConfig();
public void cleanUpCollector() {
* The WINDOW configuration:
* - TumblingEventTimeWindows => 5 MINUTES SIZE
* - Max Out Of Orderness => ZERO
* - Allowed Lateness => DEFAULT (ZERO)
* That configuration means:
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far
* - End Of Window (EOW)
* Events arrive as below:
* - 8:04 => [8:00, 8:05) - 8:02 => [8:00, 8:05) - 8:06 => [8:05, 8:10)
* - CW = 8:04 - CW = 8:04 - CW = 8:06
* When CW becomes 8:06 which is >= EOW for [8:00, 8:05), that
* window's result should be triggered/fired off AND the window
* itself should be discarded since:
* CW (8:06) >= EOW (8:04:59:999) + AllowedLateness (0)
* Finally, the overall windows' contents look like:
* - [8:00, 8:05): [item1, item2]
* - [8:05, 8:10): [item3]
public void whenZEROOutOfOrdernessAndZEROAllowedLateness() throws Exception {
Item item1 = Item
Item item2 = Item
Item item3 = Item
DataStream<Item> items = execEnv
.fromElements(item1, item2, item3)
new WatermarkAssigner<>(Time.seconds(0), Item::getTs)
DataStream<Item> agged = aggregateItems(items);
assertEquals(2, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
assertEquals(15, window1Res.getPrice().intValue()); // item1 & item2
assertEquals(20, window2Res.getPrice().intValue()); // item 3
* The WINDOW configuration:
* - TumblingEventTimeWindows => 5 MINUTES SIZE
* - Max Out Of Orderness => 2 MINUTES
* - Allowed Lateness => DEFAULT (ZERO)
* That configuration means:
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far - max-out-of-orderness (2 minutes)
* - End Of Window (EOW)
* Events arrive as below:
* - 8:04 => [8:00, 8:05) - 8:06 => [8:05, 8:10) - 8:01 => [8:00, 8:05) - 8:08 => [8:05, 8:10)
* - CW = 8:04 - 2 = 8:02 - CW = 8:06 - 2 = 8:04 - CW = 8:06 - 2 = 8:04 - CW = 8:08 - 2 = 8:06
* When CW becomes 8:06 which is >= EOW for [8:00, 8:05), that
* window's result should be triggered/fired off AND the window
* itself should be discarded since:
* CW (8:06) >= EOW (8:04:59:999) + AllowedLateness (0)
* Due to our MaxOutOfOrderness of 2 MINUTES, we're basically shifting
* back our CW by, well, 2 minutes.
* In the PREVIOUS test, the arrival of the event with event time of 8:06
* triggered the result of window [8:00, 8:05). BUT, in this test, you can
* see that that the arrival of the event with event time of 8:08 triggered
* the result of that SAME window [8:00, 8:05).
* Finally, the overall windows' contents look like:
* - [8:00, 8:05): [item1, item3]
* - [8:05, 8:10): [item2, item4]
public void when2MinutesOutOfOrdernessAndZEROAllowedLateness() throws Exception {
Item item1 = Item
Item item2 = Item
Item item3 = Item
Item item4 = Item
DataStream<Item> items = execEnv
.fromElements(item1, item2, item3, item4)
new WatermarkAssigner<>(Time.minutes(2), Item::getTs)
DataStream<Item> agged = aggregateItems(items);
assertEquals(2, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
assertEquals(23, window1Res.getPrice().intValue()); // item1 & item3
assertEquals(60, window2Res.getPrice().intValue()); // item2 & item4
* The WINDOW configuration:
* - TumblingEventTimeWindows => 5 MINUTES SIZE
* - Max Out Of Orderness => ZERO
* - Allowed Lateness => 2 MINUTES
* That configuration means:
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far
* - End Of Window (EOW)
* Events arrive as below:
* - 8:04 => [8:00, 8:05) - 8:06 => [8:05, 8:10) - 8:01 => [8:00, 8:05) - 8:02 => [8:00, 8:05) - 8:08 => [8:05, 8:10)
* - CW = 8:04 - CW = 8:06 - CW = 8:06 - CW = 8:06 - CW = 8:08
* When CW becomes 8:06 as the result of the arrival of the event
* with 8:06 event time, which is >= EOW for [8:00, 8:05), that
* window's result SHOULD BE triggered.
* But, since we have set an Allowed Lateness of 2 MINUTES, this window
* SHOULD BE KEPT and NOT discarded yet. While the EOW < CW < (EOW + AL),
* ANY EVENT which arrives and belongs to [8:00, 8:05), SHOULD cause an
* updated result for THAT window to be fired.
* This means the windows' states will be like the following:
* - [8:00, 8:05): [item1] => result1
* - [8:05, 8:10): [item2]
* - [8:00, 8:05): [item1, item3] => result2
* - [8:00, 8:05): [item1, item3, item4] => result3
* - [8:05, 8:10): [item2, item5] => result4
* When events with EVENT_TIME of 8:08 arrives, it'll push
* the CW to become 8:08. At which point we'll have:
* CW (8:08) >= EOW (8:04:59:999) + AllowedLateness (2)
* Hence, the window [8:00, 8:05) will be discarded. Any event
* arriving AFTER this point and belonging to [8:00, 8:05) will
* be considered TOO LATE and discarded.
public void whenZEROOutOfOrdernessAnd2MinutesAllowedLateness() throws Exception {
Item item1 = Item
Item item2 = Item
Item item3 = Item
Item item4 = Item
Item item5 = Item
DataStream<Item> items = execEnv
.fromElements(item1, item2, item3, item4, item5)
new WatermarkAssigner<>(Time.seconds(0), Item::getTs)
DataStream<Item> agged = aggregateItemsWithAllowedLateness(items, Time.minutes(2));
* Unfortunately the following ASSERTIONS FAIL and we are
* NOT seeing the EXPECTED behavior laid out in the description
* of this test!!! ¯\_(ツ)_/¯
assertEquals(4, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
Item window3Res = CollectorSink.values.get(2);
Item window4Res = CollectorSink.values.get(3);
assertEquals(10, window1Res.getPrice().intValue()); // item1
assertEquals(23, window2Res.getPrice().intValue()); // item1 & item3
assertEquals(30, window3Res.getPrice().intValue()); // item1, item3 & item4
assertEquals(60, window4Res.getPrice().intValue()); // item2 & item5
* Instead, we're getting the behavior shown by the
* following ASSERTIONS. Which again, is a violation
* of our expectation and understanding wiht regard to
* how Allowed-Lateness behaves. Unless that understanding
* is FLAWED!!! ¯\_(ツ)_/¯
assertEquals(2, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
assertEquals(30, window1Res.getPrice().intValue()); // item1, item3 & item4
assertEquals(60, window2Res.getPrice().intValue()); // item2 & item5
// Naturally the next test case would be something like:
// - Tumbling Window : 5 minutes size
// - Max Out Of Orderness: 2 minutes
// - Allowed Lateness : 1 minute
// That one is left as an exercise for whoever is interested ;)
private DataStream<Item> aggregateItems(DataStream<Item> items) {
return items
.keyBy((KeySelector<Item, String>) Item::getCity)
.aggregate(new ItemAggregator())
private DataStream<Item> aggregateItemsWithAllowedLateness(DataStream<Item> items, Time lateness) {
return items
.keyBy((KeySelector<Item, String>) Item::getCity)
.aggregate(new ItemAggregator())
* @param opOutput The output of an operator can be captured via this
* Collector Sink and evaluated for verification purposes.
protected void executeJob(DataStream<Item> opOutput) throws Exception {
opOutput.addSink(new CollectorSink());
protected static class CollectorSink implements SinkFunction<Item> {
public static final List<Item> values = new ArrayList<>();
public synchronized void invoke(Item item) {
class ItemAggregator implements AggregateFunction<Item, Item, Item> {
public Item createAccumulator() {
return Item
public Item add(Item item, Item accumulator) {
// For the FIRST time, BRAND NEW accumulator
// EACH WINDOW will have its OWN accumulator
if (accumulator.getTs() == null) {
} else if (accumulator.getTs() < item.getTs()) {
return accumulator;
public Item getResult(Item accumulator) {
return accumulator;
public Item merge(Item item, Item accumulator) {
Item merged = Item
return merged;
class WatermarkAssigner<T> extends BoundedOutOfOrdernessTimestampExtractor<T> {
private final SerializableFunc<T, Long> eventTimeExtractor;
Time maxOutOfOrderness,
final SerializableFunc<T, Long> eventTimeExtractor
) {
this.eventTimeExtractor = eventTimeExtractor;
public long extractTimestamp(T event) {
return eventTimeExtractor.apply(event);
class Item {
private Long ts;
private Double price;
private String city;
void addUp(Item other) {
this.price += other.price;
interface SerializableFunc<IN, OUT> extends Function<IN, OUT>, Serializable {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment