Skip to content

Instantly share code, notes, and snippets.

@krolen
Created April 22, 2016 22:11
Show Gist options
  • Save krolen/9e6ba8b14c54554bfbc10fdfa6fe7308 to your computer and use it in GitHub Desktop.
Save krolen/9e6ba8b14c54554bfbc10fdfa6fe7308 to your computer and use it in GitHub Desktop.
public class MyFinishTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private Trigger<T, W> nestedTrigger;
private MyFinishTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}
@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
if(timestamp == Long.MAX_VALUE) {
return TriggerResult.FIRE_AND_PURGE;
}
return nestedTrigger.onElement(element, timestamp, window, ctx);
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
if(time == Long.MAX_VALUE) {
return TriggerResult.FIRE_AND_PURGE;
}
return nestedTrigger.onEventTime(time, window, ctx);
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
if(time == Long.MAX_VALUE) {
return TriggerResult.FIRE_AND_PURGE;
}
return nestedTrigger.onProcessingTime(time, window, ctx);
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
nestedTrigger.clear(window, ctx);
}
@Override
public String toString() {
return "PurgingTrigger(" + nestedTrigger.toString() + ")";
}
/**
* Creates a new purging trigger from the given {@code Trigger}.
*
* @param nestedTrigger The trigger that is wrapped by this purging trigger
*/
public static <T, W extends Window> MyFinishTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
return new MyFinishTrigger<>(nestedTrigger);
}
@VisibleForTesting
public Trigger<T, W> getNestedTrigger() {
return nestedTrigger;
}
}
@Test
public void testWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 33).forEach(l -> {
ctx.collect(Tuple2.of(0L, "This is " + l));
});
}
@Override
public void cancel() {
}
});
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
source.
keyBy(0).window(GlobalWindows.create()).trigger(MyFinishTrigger.of(PurgingTrigger.of(CountTrigger.of(5)))).
apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception {
System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
}
});
env.execute("yoyoyo");
}
@Test
public void testWindowsWithTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 33).forEach(l -> {
ctx.collect(Tuple2.of(0L, "This is " + l));
});
}
@Override
public void cancel() {}
});
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
source.
keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).trigger(MyFinishTrigger.of(ProcessingTimeTrigger.create()))
.apply(new WindowFunction<Tuple2<Long, String>, Tuple2<Long, String>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, String>> input, Collector<Tuple2<Long, String>> out) throws Exception {
System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
}
})
.print();
env.execute("yoyoyo");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment