Last active
August 12, 2019 04:03
-
-
Save Sam-Serpoosh/194068bd4e9fea9958bfae1cf618597b to your computer and use it in GitHub Desktop.
Testing TumblingEventTimeWindow of Flink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.uber.gairos.flink.operator.flink_operator; | |
import lombok.Builder; | |
import lombok.Getter; | |
import lombok.Setter; | |
import lombok.NoArgsConstructor; | |
import lombok.AllArgsConstructor; | |
import lombok.ToString; | |
import org.apache.flink.api.common.ExecutionConfig; | |
import org.apache.flink.api.common.functions.AggregateFunction; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.sink.SinkFunction; | |
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.junit.Before; | |
import org.junit.Test; | |
import java.io.Serializable; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.function.Function; | |
import static org.junit.Assert.assertEquals; | |
/** | |
* Expectations and explanations of these tests are laid out based | |
* on a conversation which you can find in the following StackOverflow | |
* question: | |
* | |
* - https://stackoverflow.com/questions/57121018/flink-windows-boundaries-watermark-event-timestamp-processing-time | |
* | |
* A few points about the layout of the events in the test examples' | |
* descriptions: | |
* | |
* - Only the EVENT TIME of the arriving events are laid out for brevity | |
* - The relative LEFT/RIGHT placement of the events along the X-axis | |
* indicates their ARRIVAL time with regard to WALL-CLOCK or | |
* PROCESSING TIME. | |
* | |
* Also, in the assertions you'll notice the outcome/aggregated result | |
* of the LAST window is verified; EVEN THOUGH, technically the result | |
* of the LAST window should NOT be triggered due to the state of: | |
* | |
* - End Of Window (EOW) | |
* - Current Watermark (CW) | |
* | |
* I assume this happens due to Flink FLUSHING the remainder of the | |
* job's content/state/window once its execution is wrapped up in | |
* the test. | |
*/ | |
public class TumblingWindowAggregationBehaviorTest { | |
private final static String AGG_ID = "item_aggregator"; | |
private final static String CHI = "Chicago"; | |
private final static Time WIN_SIZE = Time.minutes(5); | |
// ALL the following times belong to Wednesday July 24th 2019 UTC | |
private final static Long UTC_801_AM = 1563955260000L; | |
private final static Long UTC_802_AM = 1563955320000L; | |
private final static Long UTC_804_AM = 1563955440000L; | |
private final static Long UTC_806_AM = 1563955560000L; | |
private final static Long UTC_808_AM = 1563955680000L; | |
private StreamExecutionEnvironment execEnv; | |
private ExecutionConfig execCfg; | |
/** | |
* We invoke setAutoWatermarkInterval with the value ZERO | |
* cause we want the emission of a watermark after EVERY EVENT | |
* as opposed to the normal behavior which emits watermarks | |
* periodically when dealing with Out-Of-Orderness assigners. | |
* | |
* This is NOT efficient in production, but we use it for | |
* our testing purposes to ensure watermarks are being | |
* emitted after each event and the stream is advancing in | |
* the eyes of Flink. | |
*/ | |
@Before | |
public void setupExecEnv() { | |
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); | |
execCfg = execEnv.getConfig(); | |
execCfg.setAutoWatermarkInterval(0L); | |
execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
execEnv.setParallelism(1); | |
} | |
@Before | |
public void cleanUpCollector() { | |
CollectorSink.values.clear(); | |
} | |
/** | |
* The WINDOW configuration: | |
* | |
* - TumblingEventTimeWindows => 5 MINUTES SIZE | |
* - Max Out Of Orderness => ZERO | |
* - Allowed Lateness => DEFAULT (ZERO) | |
* | |
* That configuration means: | |
* | |
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far | |
* - End Of Window (EOW) | |
* | |
* Events arrive as below: | |
* | |
* - 8:04 => [8:00, 8:05) - 8:02 => [8:00, 8:05) - 8:06 => [8:05, 8:10) | |
* - CW = 8:04 - CW = 8:04 - CW = 8:06 | |
* | |
* When CW becomes 8:06 which is >= EOW for [8:00, 8:05), that | |
* window's result should be triggered/fired off AND the window | |
* itself should be discarded since: | |
* | |
* CW (8:06) >= EOW (8:04:59:999) + AllowedLateness (0) | |
* | |
* Finally, the overall windows' contents look like: | |
* | |
* - [8:00, 8:05): [item1, item2] | |
* - [8:05, 8:10): [item3] | |
* | |
*/ | |
@Test | |
public void whenZEROOutOfOrdernessAndZEROAllowedLateness() throws Exception { | |
Item item1 = Item | |
.builder() | |
.ts(UTC_804_AM) | |
.price(10.0d) | |
.city(CHI) | |
.build(); | |
Item item2 = Item | |
.builder() | |
.ts(UTC_802_AM) | |
.price(5.0d) | |
.city(CHI) | |
.build(); | |
Item item3 = Item | |
.builder() | |
.ts(UTC_806_AM) | |
.price(20.0d) | |
.city(CHI) | |
.build(); | |
DataStream<Item> items = execEnv | |
.fromElements(item1, item2, item3) | |
.assignTimestampsAndWatermarks( | |
new WatermarkAssigner<>(Time.seconds(0), Item::getTs) | |
); | |
DataStream<Item> agged = aggregateItems(items); | |
executeJob(agged); | |
assertEquals(2, CollectorSink.values.size()); | |
Item window1Res = CollectorSink.values.get(0); | |
Item window2Res = CollectorSink.values.get(1); | |
assertEquals(15, window1Res.getPrice().intValue()); // item1 & item2 | |
assertEquals(20, window2Res.getPrice().intValue()); // item 3 | |
} | |
/** | |
* The WINDOW configuration: | |
* | |
* - TumblingEventTimeWindows => 5 MINUTES SIZE | |
* - Max Out Of Orderness => 2 MINUTES | |
* - Allowed Lateness => DEFAULT (ZERO) | |
* | |
* That configuration means: | |
* | |
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far - max-out-of-orderness (2 minutes) | |
* - End Of Window (EOW) | |
* | |
* Events arrive as below: | |
* | |
* - 8:04 => [8:00, 8:05) - 8:06 => [8:05, 8:10) - 8:01 => [8:00, 8:05) - 8:08 => [8:05, 8:10) | |
* - CW = 8:04 - 2 = 8:02 - CW = 8:06 - 2 = 8:04 - CW = 8:06 - 2 = 8:04 - CW = 8:08 - 2 = 8:06 | |
* | |
* | |
* When CW becomes 8:06 which is >= EOW for [8:00, 8:05), that | |
* window's result should be triggered/fired off AND the window | |
* itself should be discarded since: | |
* | |
* CW (8:06) >= EOW (8:04:59:999) + AllowedLateness (0) | |
* | |
* Due to our MaxOutOfOrderness of 2 MINUTES, we're basically shifting | |
* back our CW by, well, 2 minutes. | |
* | |
* In the PREVIOUS test, the arrival of the event with event time of 8:06 | |
* triggered the result of window [8:00, 8:05). BUT, in this test, you can | |
* see that that the arrival of the event with event time of 8:08 triggered | |
* the result of that SAME window [8:00, 8:05). | |
* | |
* Finally, the overall windows' contents look like: | |
* | |
* - [8:00, 8:05): [item1, item3] | |
* - [8:05, 8:10): [item2, item4] | |
*/ | |
@Test | |
public void when2MinutesOutOfOrdernessAndZEROAllowedLateness() throws Exception { | |
Item item1 = Item | |
.builder() | |
.ts(UTC_804_AM) | |
.price(10.0d) | |
.city(CHI) | |
.build(); | |
Item item2 = Item | |
.builder() | |
.ts(UTC_806_AM) | |
.price(20.0d) | |
.city(CHI) | |
.build(); | |
Item item3 = Item | |
.builder() | |
.ts(UTC_801_AM) | |
.price(13.0d) | |
.city(CHI) | |
.build(); | |
Item item4 = Item | |
.builder() | |
.ts(UTC_808_AM) | |
.price(40.0d) | |
.city(CHI) | |
.build(); | |
DataStream<Item> items = execEnv | |
.fromElements(item1, item2, item3, item4) | |
.assignTimestampsAndWatermarks( | |
new WatermarkAssigner<>(Time.minutes(2), Item::getTs) | |
); | |
DataStream<Item> agged = aggregateItems(items); | |
executeJob(agged); | |
assertEquals(2, CollectorSink.values.size()); | |
Item window1Res = CollectorSink.values.get(0); | |
Item window2Res = CollectorSink.values.get(1); | |
assertEquals(23, window1Res.getPrice().intValue()); // item1 & item3 | |
assertEquals(60, window2Res.getPrice().intValue()); // item2 & item4 | |
} | |
/** | |
* The WINDOW configuration: | |
* | |
* - TumblingEventTimeWindows => 5 MINUTES SIZE | |
* - Max Out Of Orderness => ZERO | |
* - Allowed Lateness => 2 MINUTES | |
* | |
* That configuration means: | |
* | |
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far | |
* - End Of Window (EOW) | |
* | |
* Events arrive as below: | |
* | |
* - 8:04 => [8:00, 8:05) - 8:06 => [8:05, 8:10) - 8:01 => [8:00, 8:05) - 8:02 => [8:00, 8:05) - 8:08 => [8:05, 8:10) | |
* - CW = 8:04 - CW = 8:06 - CW = 8:06 - CW = 8:06 - CW = 8:08 | |
* | |
* | |
* When CW becomes 8:06 as the result of the arrival of the event | |
* with 8:06 event time, which is >= EOW for [8:00, 8:05), that | |
* window's result SHOULD BE triggered. | |
* | |
* But, since we have set an Allowed Lateness of 2 MINUTES, this window | |
* SHOULD BE KEPT and NOT discarded yet. While the EOW < CW < (EOW + AL), | |
* ANY EVENT which arrives and belongs to [8:00, 8:05), SHOULD cause an | |
* updated result for THAT window to be fired. | |
* | |
* This means the windows' states will be like the following: | |
* | |
* - [8:00, 8:05): [item1] => result1 | |
* - [8:05, 8:10): [item2] | |
* - [8:00, 8:05): [item1, item3] => result2 | |
* - [8:00, 8:05): [item1, item3, item4] => result3 | |
* - [8:05, 8:10): [item2, item5] => result4 | |
* | |
* When events with EVENT_TIME of 8:08 arrives, it'll push | |
* the CW to become 8:08. At which point we'll have: | |
* | |
* CW (8:08) >= EOW (8:04:59:999) + AllowedLateness (2) | |
* | |
* Hence, the window [8:00, 8:05) will be discarded. Any event | |
* arriving AFTER this point and belonging to [8:00, 8:05) will | |
* be considered TOO LATE and discarded. | |
* | |
*/ | |
@Test | |
public void whenZEROOutOfOrdernessAnd2MinutesAllowedLateness() throws Exception { | |
Item item1 = Item | |
.builder() | |
.ts(UTC_804_AM) | |
.price(10.0d) | |
.city(CHI) | |
.build(); | |
Item item2 = Item | |
.builder() | |
.ts(UTC_806_AM) | |
.price(20.0d) | |
.city(CHI) | |
.build(); | |
Item item3 = Item | |
.builder() | |
.ts(UTC_801_AM) | |
.price(13.0d) | |
.city(CHI) | |
.build(); | |
Item item4 = Item | |
.builder() | |
.ts(UTC_802_AM) | |
.price(7.0d) | |
.city(CHI) | |
.build(); | |
Item item5 = Item | |
.builder() | |
.ts(UTC_808_AM) | |
.price(40.0d) | |
.city(CHI) | |
.build(); | |
DataStream<Item> items = execEnv | |
.fromElements(item1, item2, item3, item4, item5) | |
.assignTimestampsAndWatermarks( | |
new WatermarkAssigner<>(Time.seconds(0), Item::getTs) | |
); | |
DataStream<Item> agged = aggregateItemsWithAllowedLateness(items, Time.minutes(2)); | |
executeJob(agged); | |
/* | |
* Unfortunately the following ASSERTIONS FAIL and we are | |
* NOT seeing the EXPECTED behavior laid out in the description | |
* of this test!!! ¯\_(ツ)_/¯ | |
*/ | |
/* | |
assertEquals(4, CollectorSink.values.size()); | |
Item window1Res = CollectorSink.values.get(0); | |
Item window2Res = CollectorSink.values.get(1); | |
Item window3Res = CollectorSink.values.get(2); | |
Item window4Res = CollectorSink.values.get(3); | |
assertEquals(10, window1Res.getPrice().intValue()); // item1 | |
assertEquals(23, window2Res.getPrice().intValue()); // item1 & item3 | |
assertEquals(30, window3Res.getPrice().intValue()); // item1, item3 & item4 | |
assertEquals(60, window4Res.getPrice().intValue()); // item2 & item5 | |
*/ | |
/* | |
* Instead, we're getting the behavior shown by the | |
* following ASSERTIONS. Which again, is a violation | |
* of our expectation and understanding wiht regard to | |
* how Allowed-Lateness behaves. Unless that understanding | |
* is FLAWED!!! ¯\_(ツ)_/¯ | |
*/ | |
assertEquals(2, CollectorSink.values.size()); | |
Item window1Res = CollectorSink.values.get(0); | |
Item window2Res = CollectorSink.values.get(1); | |
assertEquals(30, window1Res.getPrice().intValue()); // item1, item3 & item4 | |
assertEquals(60, window2Res.getPrice().intValue()); // item2 & item5 | |
} | |
// Naturally the next test case would be something like: | |
// | |
// - Tumbling Window : 5 minutes size | |
// - Max Out Of Orderness: 2 minutes | |
// - Allowed Lateness : 1 minute | |
// | |
// That one is left as an exercise for whoever is interested ;) | |
private DataStream<Item> aggregateItems(DataStream<Item> items) { | |
return items | |
.keyBy((KeySelector<Item, String>) Item::getCity) | |
.window(TumblingEventTimeWindows.of(WIN_SIZE)) | |
.aggregate(new ItemAggregator()) | |
.uid(AGG_ID); | |
} | |
private DataStream<Item> aggregateItemsWithAllowedLateness(DataStream<Item> items, Time lateness) { | |
return items | |
.keyBy((KeySelector<Item, String>) Item::getCity) | |
.window(TumblingEventTimeWindows.of(WIN_SIZE)) | |
.allowedLateness(lateness) | |
.aggregate(new ItemAggregator()) | |
.uid(AGG_ID); | |
} | |
/** | |
* @param opOutput The output of an operator can be captured via this | |
* Collector Sink and evaluated for verification purposes. | |
*/ | |
protected void executeJob(DataStream<Item> opOutput) throws Exception { | |
opOutput.addSink(new CollectorSink()); | |
execEnv.execute(); | |
} | |
protected static class CollectorSink implements SinkFunction<Item> { | |
public static final List<Item> values = new ArrayList<>(); | |
@Override | |
public synchronized void invoke(Item item) { | |
values.add(item); | |
} | |
} | |
} | |
class ItemAggregator implements AggregateFunction<Item, Item, Item> { | |
@Override | |
public Item createAccumulator() { | |
return Item | |
.builder() | |
.ts(null) | |
.price(0.0d) | |
.build(); | |
} | |
@Override | |
public Item add(Item item, Item accumulator) { | |
// For the FIRST time, BRAND NEW accumulator | |
// EACH WINDOW will have its OWN accumulator | |
if (accumulator.getTs() == null) { | |
accumulator.setTs(item.getTs()); | |
accumulator.setCity(item.getCity()); | |
} else if (accumulator.getTs() < item.getTs()) { | |
accumulator.setTs(item.getTs()); | |
} | |
accumulator.addUp(item); | |
return accumulator; | |
} | |
@Override | |
public Item getResult(Item accumulator) { | |
return accumulator; | |
} | |
@Override | |
public Item merge(Item item, Item accumulator) { | |
Item merged = Item | |
.builder() | |
.city(accumulator.getCity()) | |
.price(accumulator.getPrice()) | |
.ts(accumulator.getTs()) | |
.build(); | |
merged.addUp(item); | |
return merged; | |
} | |
} | |
class WatermarkAssigner<T> extends BoundedOutOfOrdernessTimestampExtractor<T> { | |
private final SerializableFunc<T, Long> eventTimeExtractor; | |
WatermarkAssigner( | |
Time maxOutOfOrderness, | |
final SerializableFunc<T, Long> eventTimeExtractor | |
) { | |
super(maxOutOfOrderness); | |
this.eventTimeExtractor = eventTimeExtractor; | |
} | |
@Override | |
public long extractTimestamp(T event) { | |
return eventTimeExtractor.apply(event); | |
} | |
} | |
@Builder | |
@Getter | |
@Setter | |
@NoArgsConstructor | |
@AllArgsConstructor | |
@ToString | |
class Item { | |
private Long ts; | |
private Double price; | |
private String city; | |
void addUp(Item other) { | |
this.price += other.price; | |
} | |
} | |
interface SerializableFunc<IN, OUT> extends Function<IN, OUT>, Serializable {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment