Created
April 23, 2016 14:55
-
-
Save krolen/ed1344e4d7be5b2116061685268651f5 to your computer and use it in GitHub Desktop.
Flink - counting trigger that correctly flushes all windows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FinishingCountTrigger<W extends Window> extends Trigger<Object, W> { | |
private static final long serialVersionUID = 1L; | |
private final long maxCount; | |
private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L); | |
public FinishingCountTrigger(long maxCount) { | |
this.maxCount = maxCount; | |
} | |
@Override | |
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException { | |
ValueState<Long> count = ctx.getPartitionedState(stateDesc); | |
long currentCount = count.value() + 1; | |
if(currentCount == 1) { | |
ctx.registerEventTimeTimer(Long.MAX_VALUE - 1); | |
} | |
count.update(currentCount); | |
if (currentCount >= maxCount) { | |
count.update(0L); | |
ctx.deleteEventTimeTimer(Long.MAX_VALUE - 1); | |
return TriggerResult.FIRE_AND_PURGE; | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { | |
if(time == Long.MAX_VALUE - 1) { | |
return TriggerResult.FIRE_AND_PURGE; | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public void clear(W window, TriggerContext ctx) throws Exception { | |
ctx.getPartitionedState(stateDesc).clear(); | |
} | |
@Override | |
public String toString() { | |
return "FinishingCountTrigger(" + maxCount + ")"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() { | |
@Override | |
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { | |
LongStream.range(0, 43).forEach(l -> { | |
ctx.collect(Tuple2.of(0L, "This is " + l)); | |
}); | |
} | |
@Override | |
public void cancel() {} | |
}); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
source | |
.keyBy(0) | |
.window(GlobalWindows.create()) | |
.trigger(new FinishingCountTrigger<>(5)) | |
.apply(new WindowFunction<Tuple2<Long, String>, String, Tuple, GlobalWindow>() { | |
@Override | |
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<String> out) throws Exception { | |
System.out.println("!!!!!!!!! " + Thread.currentThread().getId() + ": " + Joiner.on(",").join(input)); | |
out.collect(input.iterator().next().f1); | |
} | |
}) | |
.print(); | |
env.execute("yoyoyo"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment