Created
May 20, 2019 11:40
-
-
Save tonvanbart/9f94005a9303ca79f49f96157927b6c7 to your computer and use it in GitHub Desktop.
Flink streaming average example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kpn.datalab.mab; | |
import org.apache.flink.api.common.functions.AggregateFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.Random; | |
public class AggregationTestIT { | |
private static final Logger log = LoggerFactory.getLogger(AggregationTestIT.class); | |
@Test | |
public void testPipeline() throws Exception { | |
log.debug("running local pipeline()"); | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
DataStreamSource<MyEvent> events = env.addSource(new MySource()); | |
events.keyBy(myEvent -> myEvent.variant) | |
.window(TumblingProcessingTimeWindows.of(Time.seconds(4))) | |
.aggregate(new AverageAggregator()) | |
.print(); | |
env.execute(); | |
} | |
/** | |
* Aggregation function for average. | |
*/ | |
public static class AverageAggregator implements AggregateFunction<MyEvent, MyAverage, Tuple2<MyAverage, Double>> { | |
@Override | |
public MyAverage createAccumulator() { | |
return new MyAverage(); | |
} | |
@Override | |
public MyAverage add(MyEvent myEvent, MyAverage myAverage) { | |
log.debug("add({},{})", myAverage.variant, myEvent); | |
myAverage.variant = myEvent.variant; | |
myAverage.count = myAverage.count + 1; | |
myAverage.sum = myAverage.sum + myEvent.cev; | |
return myAverage; | |
} | |
@Override | |
public Tuple2<MyAverage, Double> getResult(MyAverage myAverage) { | |
return new Tuple2<>(myAverage, myAverage.sum / myAverage.count); | |
} | |
@Override | |
public MyAverage merge(MyAverage myAverage, MyAverage acc1) { | |
myAverage.sum = myAverage.sum + acc1.sum; | |
myAverage.count = myAverage.count + acc1.count; | |
return myAverage; | |
} | |
} | |
/** | |
* Produce never ending stream of fake updates. | |
*/ | |
public static class MySource extends RichSourceFunction<MyEvent> { | |
private boolean running = true; | |
private String[] variants = new String[] { "var1", "var2" }; | |
private String[] products = new String[] { "prodfoo", "prodBAR", "prod-baz" }; | |
private Random random = new Random(); | |
@Override | |
public void run(SourceContext<MyEvent> sourceContext) throws Exception { | |
while (running) { | |
String variant = variants[random.nextInt(variants.length)]; | |
String product = products[random.nextInt(products.length)]; | |
Double value = random.nextDouble() * 10; | |
sourceContext.collect(new MyEvent(variant, product, value)); | |
Thread.sleep(500); | |
} | |
} | |
@Override | |
public void cancel() { | |
running = false; | |
} | |
} | |
/** | |
* Immutable update event. | |
*/ | |
public static class MyEvent { | |
public final String variant; | |
public final String product; | |
public final Double cev; | |
public MyEvent(String variant, String product, Double cev) { | |
this.variant = variant; | |
this.product = product; | |
this.cev = cev; | |
} | |
@Override | |
public String toString() { | |
return "{" + | |
"'variant' : '" + variant + "\'" + | |
", 'product' : '" + product + "\'" + | |
", 'cev' : " + cev + | |
'}'; | |
} | |
} | |
public static class MyAverage { | |
public String variant; | |
public Integer count = 0; | |
public Double sum = 0d; | |
@Override | |
public String toString() { | |
return "MyAverage{" + | |
"variant='" + variant + '\'' + | |
", count=" + count + | |
", sum=" + sum + | |
'}'; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment