Created
October 19, 2016 06:31
-
-
Save s1ck/caf9f3f46e7a5afe6f6a73c479948fec to your computer and use it in GitHub Desktop.
Type erasure problem solely on cluster execution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.tuple.Tuple1; | |
import java.lang.reflect.Array; | |
public class Problem { | |
public static class Pojo { | |
} | |
public static class Foo<T> extends Tuple1<T> { | |
} | |
public static class Bar<T> extends Tuple1<T[]> { | |
} | |
public static class UDF<T> implements MapFunction<Foo<T>, Bar<T>> { | |
private final Class<T> clazz; | |
public UDF(Class<T> clazz) { | |
this.clazz = clazz; | |
} | |
@Override | |
public Bar<T> map(Foo<T> value) throws Exception { | |
Bar<T> bar = new Bar<>(); | |
//noinspection unchecked | |
bar.f0 = (T[]) Array.newInstance(clazz, 10); | |
return bar; | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
// runs in local, collection and cluster execution | |
withLong(); | |
// runs in local and collection execution, fails on cluster execution | |
withPojo(); | |
} | |
public static void withLong() throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
Foo<Long> foo = new Foo<>(); | |
foo.f0 = 42L; | |
DataSet<Foo<Long>> barDataSource = env.fromElements(foo); | |
DataSet<Bar<Long>> map = barDataSource.map(new UDF<>(Long.class)); | |
map.print(); | |
} | |
public static void withPojo() throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
Foo<Pojo> foo = new Foo<>(); | |
foo.f0 = new Pojo(); | |
DataSet<Foo<Pojo>> barDataSource = env.fromElements(foo); | |
DataSet<Bar<Pojo>> map = barDataSource.map(new UDF<>(Pojo.class)); | |
map.print(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.test.util.MultipleProgramsTestBase; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.Parameterized; | |
@RunWith(Parameterized.class) | |
public class ProblemTest extends MultipleProgramsTestBase { | |
public ProblemTest(TestExecutionMode mode) { | |
super(mode); | |
} | |
@Test | |
public void testWithLong() throws Exception { | |
Problem.withLong(); | |
} | |
@Test | |
public void testWithPOJO() throws Exception { | |
Problem.withPojo(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment