-
-
Save vaquarkhan/cb475730118f99e8dd47caf388c1e97c to your computer and use it in GitHub Desktop.
Example for a LEFT OUTER JOIN in Apache Flink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.myorg.quickstart; | |
import org.apache.flink.api.common.functions.CoGroupFunction; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.operators.DataSource; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.util.Collector; | |
public class LeftOuterJoinExample { | |
public static class LeftOuterJoin implements CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, Integer>> { | |
@Override | |
public void coGroup(Iterable<Tuple2<Integer, String>> leftElements, | |
Iterable<Tuple2<Integer, String>> rightElements, | |
Collector<Tuple2<Integer, Integer>> out) throws Exception { | |
final int NULL_ELEMENT = -1; | |
for (Tuple2<Integer, String> leftElem : leftElements) { | |
boolean hadElements = false; | |
for (Tuple2<Integer, String> rightElem : rightElements) { | |
out.collect(new Tuple2<Integer, Integer>(leftElem.f0, rightElem.f0)); | |
hadElements = true; | |
} | |
if (!hadElements) { | |
out.collect(new Tuple2<Integer, Integer>(leftElem.f0, NULL_ELEMENT)); | |
} | |
} | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5); | |
DataSet<Tuple2<Integer, String>> leftSide2 = leftSide.map( | |
new MapFunction<Integer, Tuple2<Integer, String>>() { | |
@Override | |
public Tuple2<Integer, String> map(Integer integer) throws Exception { | |
return new Tuple2<Integer, String>(integer, "some data"); | |
} | |
}); | |
DataSource<Integer> rightSide = env.fromElements(4, 5, 6, 7, 8, 9, 10); | |
DataSet<Tuple2<Integer, String>> rightSide2 = rightSide.map( | |
new MapFunction<Integer, Tuple2<Integer, String>>() { | |
@Override | |
public Tuple2<Integer, String> map(Integer integer) throws Exception { | |
return new Tuple2<Integer, String>(integer, "some other data"); | |
} | |
}); | |
DataSet<Tuple2<Integer, Integer>> leftOuterJoin = leftSide2.coGroup(rightSide2) | |
.where(0) | |
.equalTo(0) | |
.with(new LeftOuterJoin()); | |
leftOuterJoin.print(); | |
env.execute(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment