Created
February 24, 2015 17:09
-
-
Save mxm/d1929b4b69dda87d5c37 to your computer and use it in GitHub Desktop.
Custom Flink Serializer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.flink.mytests.customserializer; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.core.memory.DataInputView; | |
import org.apache.flink.core.memory.DataOutputView; | |
import org.apache.flink.types.Value; | |
import java.io.IOException; | |
import java.util.Arrays; | |
public class CustomSerializer { | |
public static class Vector implements Value { | |
private transient double[] doubleValues; | |
public Vector() { | |
} | |
public Vector(double[] doubleValues) { | |
this.doubleValues = doubleValues; | |
} | |
public double getElement(int position) { | |
return doubleValues[position]; | |
} | |
public void setElement(double value, int position) { | |
doubleValues[position] = value; | |
} | |
public void multiply(int factor) { | |
for (int i = 0; i < doubleValues.length; i++) { | |
doubleValues[i] *= factor; | |
} | |
} | |
@Override | |
public void write(DataOutputView out) throws IOException { | |
out.writeInt(doubleValues.length); | |
for (double value : doubleValues) { | |
out.writeDouble(value); | |
} | |
} | |
@Override | |
public void read(DataInputView in) throws IOException { | |
int length = in.readInt(); | |
double[] array = new double[length]; | |
for (int i = 0; i < length; i++) { | |
array[i] = in.readDouble(); | |
} | |
this.doubleValues = array; | |
} | |
@Override | |
public String toString() { | |
return "Vector{" + | |
"doubleValues=" + Arrays.toString(doubleValues) + | |
'}'; | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
Vector[] vectorList = new Vector[1024]; | |
// create some sample data | |
for (int v = 0; v < vectorList.length; v++) { | |
double[] arr = new double[128]; | |
for (int i = 0; i < arr.length; i++) { | |
arr[i] = i * 1.23 * v; | |
} | |
vectorList[v] = new Vector(arr); | |
} | |
// create data set | |
DataSet<Vector> source = env.fromElements(vectorList); | |
// multiply all vectors by 2 | |
DataSet<Vector> ds = source.map(new MapFunction<Vector, Vector>() { | |
private static final long serialVersionUID = -1511665386949403921L; | |
@Override | |
public Vector map(Vector value) throws Exception { | |
value.multiply(2); | |
return value; | |
} | |
}); | |
ds.print(); | |
env.execute(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment