Skip to content

Instantly share code, notes, and snippets.

@s1ck
Last active November 27, 2015 17:31
Show Gist options
  • Save s1ck/566796df5f35ee1de6f9 to your computer and use it in GitHub Desktop.
Save s1ck/566796df5f35ee1de6f9 to your computer and use it in GitHub Desktop.
Serialization Issue in Collection Environment
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.UUID;
public class ObjectInTuple {
public static void main(String[] args) throws Exception {
test(getTestEnvironment()); // fails in testGroupByWithCustom and testGroupByWithMax
test(ExecutionEnvironment.createLocalEnvironment()); // fails in testGroupByWithCustom and testGroupByWithMax
test(ExecutionEnvironment.createCollectionsEnvironment()); // fails in testPrint
}
public static void test(ExecutionEnvironment env) throws Exception {
Tuple1<ID> id1 = new Tuple1<>(new ID());
Tuple1<ID> id2 = new Tuple1<>(new ID());
Tuple1<ID> id3 = new Tuple1<>(new ID());
DataSet<Tuple1<ID>> left = env.fromElements(id1, id2);
DataSet<Tuple1<ID>> right = env.fromElements(id2, id3);
testPrint(left);
testJoin(left, right);
testGroupByWithCustom(left);
testGroupByWithMax(left);
}
public static void testPrint(DataSet<Tuple1<ID>> data) throws Exception {
System.out.println("*** testPrint");
data.print();
}
public static void testJoin(DataSet<Tuple1<ID>> left, DataSet<Tuple1<ID>> right) throws Exception {
System.out.println("*** testJoin");
System.out.println(left.join(right).where(0).equalTo(0).collect());
}
public static void testGroupByWithCustom(DataSet<Tuple1<ID>> data) throws Exception {
System.out.println("*** testGroupByWithCustom");
System.out.println(data.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple1<ID>, ID>() {
@Override
public void reduce(Iterable<Tuple1<ID>> iterable, Collector<ID> collector) throws Exception {
collector.collect(iterable.iterator().next().f0);
}
}).collect());
}
public static void testGroupByWithMax(DataSet<Tuple1<ID>> data) throws Exception {
System.out.println("*** testGroupByWithMax");
System.out.println(data.groupBy(0).max(0).collect());
}
public static ExecutionEnvironment getTestEnvironment() throws Exception {
// do the same as in the flink unit tests
ForkableFlinkMiniCluster forkableFlinkMiniCluster =
TestBaseUtils.startCluster(1, 4, StreamingMode.BATCH_ONLY, false, false, true);
return new TestEnvironment(forkableFlinkMiniCluster, 4);
}
public static class ID implements WritableComparable<ID> {
private UUID uuid;
public ID() {
this.uuid = UUID.randomUUID();
}
@Override
public int compareTo(ID o) {
return this.uuid.compareTo(o.uuid);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(uuid.getMostSignificantBits());
dataOutput.writeLong(uuid.getLeastSignificantBits());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.uuid = new UUID(dataInput.readLong(), dataInput.readLong());
}
@Override
public String toString() {
return uuid.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ID id = (ID) o;
return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null);
}
@Override
public int hashCode() {
return uuid != null ? uuid.hashCode() : 0;
}
}
}
@s1ck
Copy link
Author

s1ck commented Nov 27, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment