Skip to content

Instantly share code, notes, and snippets.

@rmetzger
Created March 31, 2015 12:35
Show Gist options
  • Save rmetzger/3174a68545aed680b57f to your computer and use it in GitHub Desktop.
Save rmetzger/3174a68545aed680b57f to your computer and use it in GitHub Desktop.
Difference Set in Flink
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, String>> one = env.fromElements(new Tuple2<Integer, String>(1, "one"),
new Tuple2<Integer, String>(2, "two"), new Tuple2<Integer, String>(3, "three")
);
DataSet<Tuple1<Integer>> two = env.fromElements(new Tuple1<Integer>(1));
DataSet<Integer> three = one.coGroup(two).where(0).equalTo(0).with(new CoGroupFunction<Tuple2<Integer, String>, Tuple1<Integer>, Integer>() {
@Override
public void coGroup(Iterable<Tuple2<Integer, String>> iterable, Iterable<Tuple1<Integer>> iterable1, Collector<Integer> collector) throws Exception {
if(!iterable1.iterator().hasNext()) {
for(Tuple2<Integer, String> el : iterable) {
collector.collect(el.f0);
}
}
}
});
three.print();
// execute program
env.execute("Flink Difference Set");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment