Created
April 22, 2016 22:11
-
-
Save krolen/9e6ba8b14c54554bfbc10fdfa6fe7308 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MyFinishTrigger<T, W extends Window> extends Trigger<T, W> { | |
private static final long serialVersionUID = 1L; | |
private Trigger<T, W> nestedTrigger; | |
private MyFinishTrigger(Trigger<T, W> nestedTrigger) { | |
this.nestedTrigger = nestedTrigger; | |
} | |
@Override | |
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { | |
if(timestamp == Long.MAX_VALUE) { | |
return TriggerResult.FIRE_AND_PURGE; | |
} | |
return nestedTrigger.onElement(element, timestamp, window, ctx); | |
} | |
@Override | |
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { | |
if(time == Long.MAX_VALUE) { | |
return TriggerResult.FIRE_AND_PURGE; | |
} | |
return nestedTrigger.onEventTime(time, window, ctx); | |
} | |
@Override | |
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { | |
if(time == Long.MAX_VALUE) { | |
return TriggerResult.FIRE_AND_PURGE; | |
} | |
return nestedTrigger.onProcessingTime(time, window, ctx); | |
} | |
@Override | |
public void clear(W window, TriggerContext ctx) throws Exception { | |
nestedTrigger.clear(window, ctx); | |
} | |
@Override | |
public String toString() { | |
return "PurgingTrigger(" + nestedTrigger.toString() + ")"; | |
} | |
/** | |
* Creates a new purging trigger from the given {@code Trigger}. | |
* | |
* @param nestedTrigger The trigger that is wrapped by this purging trigger | |
*/ | |
public static <T, W extends Window> MyFinishTrigger<T, W> of(Trigger<T, W> nestedTrigger) { | |
return new MyFinishTrigger<>(nestedTrigger); | |
} | |
@VisibleForTesting | |
public Trigger<T, W> getNestedTrigger() { | |
return nestedTrigger; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testWindows() throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() { | |
@Override | |
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { | |
LongStream.range(0, 33).forEach(l -> { | |
ctx.collect(Tuple2.of(0L, "This is " + l)); | |
}); | |
} | |
@Override | |
public void cancel() { | |
} | |
}); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
// source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()). | |
source. | |
keyBy(0).window(GlobalWindows.create()).trigger(MyFinishTrigger.of(PurgingTrigger.of(CountTrigger.of(5)))). | |
apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() { | |
@Override | |
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception { | |
System.out.println("!!!!!!!!! " + Joiner.on(",").join(input)); | |
} | |
}); | |
env.execute("yoyoyo"); | |
} | |
@Test | |
public void testWindowsWithTime() throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() { | |
@Override | |
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { | |
LongStream.range(0, 33).forEach(l -> { | |
ctx.collect(Tuple2.of(0L, "This is " + l)); | |
}); | |
} | |
@Override | |
public void cancel() {} | |
}); | |
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
// source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()). | |
source. | |
keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).trigger(MyFinishTrigger.of(ProcessingTimeTrigger.create())) | |
.apply(new WindowFunction<Tuple2<Long, String>, Tuple2<Long, String>, Tuple, TimeWindow>() { | |
@Override | |
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, String>> input, Collector<Tuple2<Long, String>> out) throws Exception { | |
System.out.println("!!!!!!!!! " + Joiner.on(",").join(input)); | |
} | |
}) | |
.print(); | |
env.execute("yoyoyo"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment