Forked from saarw/MultiSourceWindowCoGroupTest.java
Last active
November 23, 2016 14:40
-
-
Save mxm/a1d6b22c772971c98e2ce886dc9818b1 to your computer and use it in GitHub Desktop.
MultiSourceWindowCoGroupTest
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.annotations.VisibleForTesting; | |
import org.apache.flink.api.common.functions.CoGroupFunction; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.common.functions.ReduceFunction; | |
import org.apache.flink.api.common.state.ReducingState; | |
import org.apache.flink.api.common.state.ReducingStateDescriptor; | |
import org.apache.flink.api.common.typeutils.base.LongSerializer; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.api.java.tuple.Tuple; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; | |
import org.apache.flink.streaming.api.functions.windowing.WindowFunction; | |
import org.apache.flink.streaming.api.watermark.Watermark; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger; | |
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.streaming.api.windowing.windows.Window; | |
import org.apache.flink.util.Collector; | |
import static org.apache.flink.streaming.api.windowing.time.Time.milliseconds; | |
public class William2 { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
DataStream<Integer> integerDataStreamSource = env | |
.fromElements(1, 2, 3, 4) | |
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { | |
@Override | |
public long extractTimestamp(Integer element, long previousElementTimestamp) { | |
return (long) element; | |
} | |
@Override | |
public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) { | |
return new Watermark(lastElement); | |
} | |
}); | |
DataStream<Integer> integerDataStreamSource2 = env | |
.fromElements(1, 2, 3, 4) | |
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { | |
@Override | |
public long extractTimestamp(Integer element, long previousElementTimestamp) { | |
return (long) element; | |
} | |
@Override | |
public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) { | |
return new Watermark(lastElement); | |
} | |
}) | |
.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() { | |
@Override | |
public Tuple2<Integer, Integer> map(Integer value) throws Exception { | |
return new Tuple2<>(1, value); | |
} | |
}) | |
.keyBy(0) | |
.window(TumblingEventTimeWindows.of(milliseconds(10))) | |
.trigger(ContinuousEventTimeTrigger.of(milliseconds(1))) | |
.apply(new WindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, TimeWindow>() { | |
@Override | |
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Integer, Integer>> input, Collector<Integer> out) throws Exception { | |
System.out.println("tiggering source 2"); | |
for (Tuple2<Integer, Integer> val : input) { | |
System.out.println(val); | |
out.collect(val.f1); | |
} | |
} | |
}); | |
integerDataStreamSource | |
.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() { | |
@Override | |
public Tuple2<Integer, Integer> map(Integer value) throws Exception { | |
return new Tuple2<>(1, value); | |
} | |
}) | |
.keyBy(0) | |
.window(TumblingEventTimeWindows.of(milliseconds(10))) | |
.trigger(ContinuousEventTimeTrigger.of(milliseconds(1))) | |
.apply(new WindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, TimeWindow>() { | |
@Override | |
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Integer, Integer>> input, Collector<Integer> out) throws Exception { | |
System.out.println("tiggering source 1"); | |
for (Tuple2<Integer, Integer> val : input) { | |
System.out.println(val); | |
out.collect(val.f1); | |
} | |
} | |
}) | |
.coGroup(integerDataStreamSource2) | |
.where(new KeySelector<Integer, Integer>() { | |
@Override | |
public Integer getKey(Integer value) throws Exception { | |
return value; | |
} | |
}) | |
.equalTo(new KeySelector<Integer, Integer>() { | |
@Override | |
public Integer getKey(Integer value) throws Exception { | |
return value; | |
} | |
}) | |
.window(TumblingEventTimeWindows.of(milliseconds(10))) | |
.trigger(MyContinuousEventTimeTrigger.of(milliseconds(1))) | |
.apply(new CoGroupFunction<Integer, Integer, Object>() { | |
@Override | |
public void coGroup(Iterable<Integer> first, Iterable<Integer> second, Collector<Object> out) throws Exception { | |
System.out.println("trigger cogroup"); | |
System.out.println("first"); | |
for (int val : first){ | |
System.out.println(val); | |
} | |
System.out.println("second"); | |
for (int val : second){ | |
System.out.println(val); | |
} | |
} | |
}); | |
env.execute(); | |
} | |
public static class MyContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> { | |
private static final long serialVersionUID = 1L; | |
private final long interval; | |
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */ | |
private final ReducingStateDescriptor<Long> stateDesc = | |
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); | |
private MyContinuousEventTimeTrigger(long interval) { | |
this.interval = interval; | |
} | |
@Override | |
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { | |
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { | |
// if the watermark is already past the window fire immediately | |
return TriggerResult.FIRE; | |
} else { | |
ctx.registerEventTimeTimer(window.maxTimestamp()); | |
} | |
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); | |
if (fireTimestamp.get() == null) { | |
long start = timestamp - (timestamp % interval); | |
long nextFireTimestamp = start + interval; | |
ctx.registerEventTimeTimer(nextFireTimestamp); | |
fireTimestamp.add(nextFireTimestamp); | |
return TriggerResult.CONTINUE; | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { | |
if (time == window.maxTimestamp()){ | |
return TriggerResult.FIRE; | |
} | |
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); | |
if (fireTimestamp.get() <= time) { | |
fireTimestamp.clear(); | |
fireTimestamp.add(time + interval); | |
ctx.registerEventTimeTimer(time + interval); | |
return TriggerResult.FIRE; | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public void clear(W window, TriggerContext ctx) throws Exception { | |
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); | |
long timestamp = fireTimestamp.get(); | |
ctx.deleteEventTimeTimer(timestamp); | |
fireTimestamp.clear(); | |
} | |
@Override | |
public boolean canMerge() { | |
return true; | |
} | |
@Override | |
public TriggerResult onMerge(W window, OnMergeContext ctx) { | |
ctx.mergePartitionedState(stateDesc); | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public String toString() { | |
return "MyContinuousProcessingTimeTrigger(" + interval + ")"; | |
} | |
@VisibleForTesting | |
public long getInterval() { | |
return interval; | |
} | |
/** | |
* Creates a trigger that continuously fires based on the given interval. | |
* | |
* @param interval The time interval at which to fire. | |
* @param <W> The type of {@link Window Windows} on which this trigger can operate. | |
*/ | |
public static <W extends Window> MyContinuousEventTimeTrigger<W> of(Time interval) { | |
return new MyContinuousEventTimeTrigger<>(interval.toMilliseconds()); | |
} | |
private static class Min implements ReduceFunction<Long> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Long reduce(Long value1, Long value2) throws Exception { | |
return Math.min(value1, value2); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment