Last active
November 27, 2015 17:31
-
-
Save s1ck/566796df5f35ee1de6f9 to your computer and use it in GitHub Desktop.
Serialization Issue in Collection Environment
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.GroupReduceFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.tuple.Tuple1; | |
import org.apache.flink.runtime.StreamingMode; | |
import org.apache.flink.test.util.ForkableFlinkMiniCluster; | |
import org.apache.flink.test.util.TestBaseUtils; | |
import org.apache.flink.test.util.TestEnvironment; | |
import org.apache.flink.util.Collector; | |
import org.apache.hadoop.io.WritableComparable; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import java.util.UUID; | |
public class ObjectInTuple { | |
public static void main(String[] args) throws Exception { | |
test(getTestEnvironment()); // fails in testGroupByWithCustom and testGroupByWithMax | |
test(ExecutionEnvironment.createLocalEnvironment()); // fails in testGroupByWithCustom and testGroupByWithMax | |
test(ExecutionEnvironment.createCollectionsEnvironment()); // fails in testPrint | |
} | |
public static void test(ExecutionEnvironment env) throws Exception { | |
Tuple1<ID> id1 = new Tuple1<>(new ID()); | |
Tuple1<ID> id2 = new Tuple1<>(new ID()); | |
Tuple1<ID> id3 = new Tuple1<>(new ID()); | |
DataSet<Tuple1<ID>> left = env.fromElements(id1, id2); | |
DataSet<Tuple1<ID>> right = env.fromElements(id2, id3); | |
testPrint(left); | |
testJoin(left, right); | |
testGroupByWithCustom(left); | |
testGroupByWithMax(left); | |
} | |
public static void testPrint(DataSet<Tuple1<ID>> data) throws Exception { | |
System.out.println("*** testPrint"); | |
data.print(); | |
} | |
public static void testJoin(DataSet<Tuple1<ID>> left, DataSet<Tuple1<ID>> right) throws Exception { | |
System.out.println("*** testJoin"); | |
System.out.println(left.join(right).where(0).equalTo(0).collect()); | |
} | |
public static void testGroupByWithCustom(DataSet<Tuple1<ID>> data) throws Exception { | |
System.out.println("*** testGroupByWithCustom"); | |
System.out.println(data.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple1<ID>, ID>() { | |
@Override | |
public void reduce(Iterable<Tuple1<ID>> iterable, Collector<ID> collector) throws Exception { | |
collector.collect(iterable.iterator().next().f0); | |
} | |
}).collect()); | |
} | |
public static void testGroupByWithMax(DataSet<Tuple1<ID>> data) throws Exception { | |
System.out.println("*** testGroupByWithMax"); | |
System.out.println(data.groupBy(0).max(0).collect()); | |
} | |
public static ExecutionEnvironment getTestEnvironment() throws Exception { | |
// do the same as in the flink unit tests | |
ForkableFlinkMiniCluster forkableFlinkMiniCluster = | |
TestBaseUtils.startCluster(1, 4, StreamingMode.BATCH_ONLY, false, false, true); | |
return new TestEnvironment(forkableFlinkMiniCluster, 4); | |
} | |
public static class ID implements WritableComparable<ID> { | |
private UUID uuid; | |
public ID() { | |
this.uuid = UUID.randomUUID(); | |
} | |
@Override | |
public int compareTo(ID o) { | |
return this.uuid.compareTo(o.uuid); | |
} | |
@Override | |
public void write(DataOutput dataOutput) throws IOException { | |
dataOutput.writeLong(uuid.getMostSignificantBits()); | |
dataOutput.writeLong(uuid.getLeastSignificantBits()); | |
} | |
@Override | |
public void readFields(DataInput dataInput) throws IOException { | |
this.uuid = new UUID(dataInput.readLong(), dataInput.readLong()); | |
} | |
@Override | |
public String toString() { | |
return uuid.toString(); | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) { | |
return true; | |
} | |
if (o == null || getClass() != o.getClass()) { | |
return false; | |
} | |
ID id = (ID) o; | |
return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null); | |
} | |
@Override | |
public int hashCode() { | |
return uuid != null ? uuid.hashCode() : 0; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
fixed in apache/flink#1415