Skip to content

Instantly share code, notes, and snippets.

@btiernay
Created January 11, 2013 15:40

Revisions

  1. btiernay created this gist Jan 11, 2013.
    211 changes: 211 additions & 0 deletions TupleEntrySerialization.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,211 @@

    import java.io.DataInputStream;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.io.OutputStream;
    import java.io.Serializable;
    import java.util.Comparator;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.WritableUtils;
    import org.apache.hadoop.io.serializer.Deserializer;
    import org.apache.hadoop.io.serializer.Serialization;
    import org.apache.hadoop.io.serializer.Serializer;

    import cascading.CascadingException;
    import cascading.tuple.Comparison;
    import cascading.tuple.Fields;
    import cascading.tuple.StreamComparator;
    import cascading.tuple.Tuple;
    import cascading.tuple.TupleEntry;
    import cascading.tuple.hadoop.SerializationToken;
    import cascading.tuple.hadoop.TupleSerialization;
    import cascading.tuple.hadoop.io.BufferedInputStream;
    import cascading.tuple.hadoop.io.TupleDeserializer;
    import cascading.tuple.hadoop.io.TupleSerializer;

    import com.google.common.base.Throwables;

    /**
    * Serialization implementation that is required for Hadoop {@link SequenceFile} serialization.
    */
    @SerializationToken(tokens = { 223 }, classNames = { "cascading.tuple.TupleEntry" })
    public class TupleEntrySerialization extends Configured implements Comparison<TupleEntry>, Serialization<TupleEntry> {

    /**
    * Delegate for recursively contained {@link Tuple} values.
    */
    private TupleSerialization tupleSerialization;

    @Override
    public void setConf(Configuration conf) {
    super.setConf(conf);

    this.tupleSerialization = new TupleSerialization(conf);
    }

    @Override
    public boolean accept(Class<?> c) {
    return TupleEntry.class.isAssignableFrom(c);
    }

    @Override
    public Serializer<TupleEntry> getSerializer(Class<TupleEntry> c) {
    final TupleSerializer tupleSerializer = (TupleSerializer) tupleSerialization.getSerializer(Tuple.class);

    return new TupleEntrySerializer(tupleSerializer);
    }

    @Override
    public Deserializer<TupleEntry> getDeserializer(Class<TupleEntry> c) {
    final TupleDeserializer tupleDeserializer = (TupleDeserializer) tupleSerialization.getDeserializer(Tuple.class);

    return new TupleEntryDeserializer(tupleDeserializer);
    }

    @Override
    public Comparator<TupleEntry> getComparator(Class<TupleEntry> arg0) {
    return new TupleEntryComparator();
    }

    /**
    * {@code TupleEntry} serializer implementation.
    */
    public static class TupleEntrySerializer implements Serializer<TupleEntry> {

    private DataOutputStream out;

    private final TupleSerializer delegate;

    public TupleEntrySerializer(TupleSerializer tupleSerializer) {
    this.delegate = tupleSerializer;
    }

    @Override
    public void open(OutputStream out) throws IOException {
    delegate.open(out);

    if(out instanceof DataOutputStream) {
    this.out = (DataOutputStream) out;
    } else {
    this.out = new DataOutputStream(out);
    }
    }

    @Override
    public void serialize(TupleEntry t) throws IOException {
    ObjectOutputStream output = new ObjectOutputStream(out); // Creating a new stream each call seems to be required
    output.writeObject(t.getFields());

    delegate.serialize(t.getTuple());
    }

    @Override
    public void close() throws IOException {
    try {
    delegate.close();
    } finally {
    out.close();
    }
    }

    }

    /**
    * {@code TupleEntry} deserializer implementation.
    */
    public static class TupleEntryDeserializer implements Deserializer<TupleEntry> {

    private DataInputStream in;

    private final TupleDeserializer delegate;

    public TupleEntryDeserializer(TupleDeserializer tupleDeserializer) {
    this.delegate = tupleDeserializer;
    }

    @Override
    public void open(InputStream in) throws IOException {
    delegate.open(in);

    if(in instanceof DataInputStream) {
    this.in = (DataInputStream) in;
    } else {
    this.in = new DataInputStream(in);
    }
    }

    @Override
    public TupleEntry deserialize(TupleEntry t) throws IOException {
    ObjectInputStream input = new ObjectInputStream(in); // Creating a new stream each call seems to be required
    TupleEntry tupleEntry = null;
    try {
    Fields fields = (Fields) input.readObject();
    Tuple tuple = delegate.deserialize(null);

    tupleEntry = new TupleEntry(fields, tuple);
    } catch(ClassNotFoundException e) {
    Throwables.propagate(e);
    }

    return tupleEntry;
    }

    @Override
    public void close() throws IOException {
    try {
    delegate.close();
    } finally {
    in.close();
    }
    }

    }

    public static class TupleEntryComparator implements StreamComparator<BufferedInputStream>, Comparator<TupleEntry>,
    Serializable {

    @Override
    public int compare(TupleEntry lhs, TupleEntry rhs) {
    if(lhs == null) {
    return -1;
    }

    if(rhs == null) {
    return 1;
    }

    return 0;
    }

    @Override
    public int compare(BufferedInputStream lhsStream, BufferedInputStream rhsStream) {
    try {
    if(lhsStream == null && rhsStream == null) {
    return 0;
    }

    if(lhsStream == null) {
    return -1;
    }

    if(rhsStream == null) {
    return 1;
    }

    String lhsString = WritableUtils.readString(new DataInputStream(lhsStream));
    String rhsString = WritableUtils.readString(new DataInputStream(rhsStream));

    return lhsString.compareTo(rhsString);
    } catch(IOException exception) {
    throw new CascadingException(exception);
    }
    }

    }

    }
    92 changes: 92 additions & 0 deletions TupleEntrySerializationTest
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,92 @@

    import static org.fest.assertions.api.Assertions.assertThat;

    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;

    import org.apache.hadoop.mapred.JobConf;
    import org.junit.Rule;
    import org.junit.Test;
    import org.junit.rules.TemporaryFolder;

    import cascading.CascadingTestCase;
    import cascading.tuple.Fields;
    import cascading.tuple.Tuple;
    import cascading.tuple.TupleEntry;
    import cascading.tuple.hadoop.TupleSerialization;
    import cascading.tuple.hadoop.io.HadoopTupleInputStream;
    import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
    import cascading.tuple.io.TupleInputStream;
    import cascading.tuple.io.TupleOutputStream;

    /**
    * Tests {@code TupleEntrySerialization}.
    */
    public class TupleEntrySerializationTest extends CascadingTestCase {

    @Rule
    TemporaryFolder tmp = new TemporaryFolder();

    @Test
    public void test_tuple_entry_serialization() throws IOException {
    // Setup
    TupleSerialization tupleSerialization = new TupleSerialization(createJobConf());
    File file = tmp.newFile();

    // @formatter:off
    TupleEntry testEntry =
    new TupleEntry(
    new Fields("x", "y", "z"),
    new Tuple(
    new TupleEntry(
    new Fields("n1", "n2"),
    new Tuple(4, 5)
    ), // x
    2, // y
    3 // z
    )
    );
    // @formatter:on

    // Output tuple state serialization
    Tuple outputTuple = writeTuple(tupleSerialization, file, testEntry);

    // Input for tuple state serialization
    Tuple inputTuple = readTuple(tupleSerialization, file);

    // Check the first item in the tuple is a TupleEntry object
    assertThat(inputTuple.getObject(0)).isInstanceOf(TupleEntry.class);
    assertThat(inputTuple.toString()).isEqualTo(outputTuple.toString());
    }

    private Tuple writeTuple(TupleSerialization tupleSerialization, File file, TupleEntry tupleEntry)
    throws FileNotFoundException, IOException {
    TupleOutputStream output =
    new HadoopTupleOutputStream(new FileOutputStream(file, false), tupleSerialization.getElementWriter());
    Tuple outputTuple = new Tuple(tupleEntry);
    output.writeTuple(outputTuple);
    output.close();

    return outputTuple;
    }

    private Tuple readTuple(TupleSerialization tupleSerialization, File file) throws FileNotFoundException, IOException {
    TupleInputStream input =
    new HadoopTupleInputStream(new FileInputStream(file), tupleSerialization.getElementReader());
    Tuple inputTuple = input.readTuple();
    input.close();

    return inputTuple;
    }

    private JobConf createJobConf() {
    JobConf jobConf = new JobConf();
    jobConf.set("io.serializations", TupleEntrySerialization.class.getName());

    return jobConf;
    }

    }