Created
October 10, 2016 11:26
-
-
Save s1ck/37aefb19198cd01a8b998fab354c2cfd to your computer and use it in GitHub Desktop.
TypeProblem when using RichFlatMapFunction on GenericArray Types
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.FlatJoinFunction; | |
import org.apache.flink.api.common.functions.RichFlatJoinFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.operators.DataSource; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.util.Collector; | |
public class TypeProblem { | |
/** | |
* Note that both cases work with {@code extends Tuple2<K, K[]>} | |
* | |
* @param <K> | |
*/ | |
public static class Foo<K> extends Tuple2<K[], K> { | |
public Foo() { | |
} | |
public Foo(K[] value0, K value1) { | |
super(value0, value1); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
Foo<Long> myFoo = new Foo<>(new Long[] { 0L, 1L, 2L }, 0L); | |
works(env, myFoo, 1).print(); | |
worksNot(env, myFoo, 1).print(); | |
} | |
private static <T> DataSet<Foo<T>> works(ExecutionEnvironment env, Foo<T> foo, int field) { | |
DataSource<Foo<T>> fooDataSource = env.fromElements(foo); | |
return fooDataSource.join(fooDataSource) | |
.where(field).equalTo(field) | |
.with(new FlatJoinFunction<Foo<T>, Foo<T>, Foo<T>>() { | |
@Override | |
public void join(Foo<T> first, Foo<T> second, | |
Collector<Foo<T>> out) throws Exception { | |
out.collect(first); | |
} | |
}); | |
} | |
private static <T> DataSet<Foo<T>> worksNot(ExecutionEnvironment env, Foo<T> foo, int field) { | |
DataSource<Foo<T>> fooDataSource = env.fromElements(foo); | |
return fooDataSource.join(fooDataSource) | |
.where(field).equalTo(field) | |
.with(new RichFlatJoinFunction<Foo<T>, Foo<T>, Foo<T>>() { | |
@Override | |
public void join(Foo<T> first, Foo<T> second, | |
Collector<Foo<T>> out) throws Exception { | |
out.collect(first); | |
} | |
}); | |
} | |
} |
Same exception for RichFlatMapFunction:
private static <T> DataSet<Foo<T>> worksNot(ExecutionEnvironment env, Foo<T> foo) {
DataSource<Foo<T>> fooDataSource = env.fromElements(foo);
return fooDataSource.flatMap(new RichFlatMapFunction<Foo<T>, Foo<T>>() {
@Override
public void flatMap(Foo<T> foo, Collector<Foo<T>> out) throws Exception {
out.collect(foo);
}
});
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Leads to