Skip to content

Instantly share code, notes, and snippets.

@tonvanbart
Created May 20, 2019 11:40
Show Gist options
  • Save tonvanbart/9f94005a9303ca79f49f96157927b6c7 to your computer and use it in GitHub Desktop.
Save tonvanbart/9f94005a9303ca79f49f96157927b6c7 to your computer and use it in GitHub Desktop.
Flink streaming average example
package com.kpn.datalab.mab;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
public class AggregationTestIT {
private static final Logger log = LoggerFactory.getLogger(AggregationTestIT.class);
@Test
public void testPipeline() throws Exception {
log.debug("running local pipeline()");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<MyEvent> events = env.addSource(new MySource());
events.keyBy(myEvent -> myEvent.variant)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4)))
.aggregate(new AverageAggregator())
.print();
env.execute();
}
/**
* Aggregation function for average.
*/
public static class AverageAggregator implements AggregateFunction<MyEvent, MyAverage, Tuple2<MyAverage, Double>> {
@Override
public MyAverage createAccumulator() {
return new MyAverage();
}
@Override
public MyAverage add(MyEvent myEvent, MyAverage myAverage) {
log.debug("add({},{})", myAverage.variant, myEvent);
myAverage.variant = myEvent.variant;
myAverage.count = myAverage.count + 1;
myAverage.sum = myAverage.sum + myEvent.cev;
return myAverage;
}
@Override
public Tuple2<MyAverage, Double> getResult(MyAverage myAverage) {
return new Tuple2<>(myAverage, myAverage.sum / myAverage.count);
}
@Override
public MyAverage merge(MyAverage myAverage, MyAverage acc1) {
myAverage.sum = myAverage.sum + acc1.sum;
myAverage.count = myAverage.count + acc1.count;
return myAverage;
}
}
/**
* Produce never ending stream of fake updates.
*/
public static class MySource extends RichSourceFunction<MyEvent> {
private boolean running = true;
private String[] variants = new String[] { "var1", "var2" };
private String[] products = new String[] { "prodfoo", "prodBAR", "prod-baz" };
private Random random = new Random();
@Override
public void run(SourceContext<MyEvent> sourceContext) throws Exception {
while (running) {
String variant = variants[random.nextInt(variants.length)];
String product = products[random.nextInt(products.length)];
Double value = random.nextDouble() * 10;
sourceContext.collect(new MyEvent(variant, product, value));
Thread.sleep(500);
}
}
@Override
public void cancel() {
running = false;
}
}
/**
* Immutable update event.
*/
public static class MyEvent {
public final String variant;
public final String product;
public final Double cev;
public MyEvent(String variant, String product, Double cev) {
this.variant = variant;
this.product = product;
this.cev = cev;
}
@Override
public String toString() {
return "{" +
"'variant' : '" + variant + "\'" +
", 'product' : '" + product + "\'" +
", 'cev' : " + cev +
'}';
}
}
public static class MyAverage {
public String variant;
public Integer count = 0;
public Double sum = 0d;
@Override
public String toString() {
return "MyAverage{" +
"variant='" + variant + '\'' +
", count=" + count +
", sum=" + sum +
'}';
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment