Created
January 11, 2013 15:40
Revisions
-
btiernay created this gist
Jan 11, 2013 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,211 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.Comparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.Serialization; import org.apache.hadoop.io.serializer.Serializer; import cascading.CascadingException; import cascading.tuple.Comparison; import cascading.tuple.Fields; import cascading.tuple.StreamComparator; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.hadoop.SerializationToken; import cascading.tuple.hadoop.TupleSerialization; import cascading.tuple.hadoop.io.BufferedInputStream; import cascading.tuple.hadoop.io.TupleDeserializer; import cascading.tuple.hadoop.io.TupleSerializer; import com.google.common.base.Throwables; /** * Serialization implementation that is required for Hadoop {@link SequenceFile} serialization. */ @SerializationToken(tokens = { 223 }, classNames = { "cascading.tuple.TupleEntry" }) public class TupleEntrySerialization extends Configured implements Comparison<TupleEntry>, Serialization<TupleEntry> { /** * Delegate for recursively contained {@link Tuple} values. */ private TupleSerialization tupleSerialization; @Override public void setConf(Configuration conf) { super.setConf(conf); this.tupleSerialization = new TupleSerialization(conf); } @Override public boolean accept(Class<?> c) { return TupleEntry.class.isAssignableFrom(c); } @Override public Serializer<TupleEntry> getSerializer(Class<TupleEntry> c) { final TupleSerializer tupleSerializer = (TupleSerializer) tupleSerialization.getSerializer(Tuple.class); return new TupleEntrySerializer(tupleSerializer); } @Override public Deserializer<TupleEntry> getDeserializer(Class<TupleEntry> c) { final TupleDeserializer tupleDeserializer = (TupleDeserializer) tupleSerialization.getDeserializer(Tuple.class); return new TupleEntryDeserializer(tupleDeserializer); } @Override public Comparator<TupleEntry> getComparator(Class<TupleEntry> arg0) { return new TupleEntryComparator(); } /** * {@code TupleEntry} serializer implementation. */ public static class TupleEntrySerializer implements Serializer<TupleEntry> { private DataOutputStream out; private final TupleSerializer delegate; public TupleEntrySerializer(TupleSerializer tupleSerializer) { this.delegate = tupleSerializer; } @Override public void open(OutputStream out) throws IOException { delegate.open(out); if(out instanceof DataOutputStream) { this.out = (DataOutputStream) out; } else { this.out = new DataOutputStream(out); } } @Override public void serialize(TupleEntry t) throws IOException { ObjectOutputStream output = new ObjectOutputStream(out); // Creating a new stream each call seems to be required output.writeObject(t.getFields()); delegate.serialize(t.getTuple()); } @Override public void close() throws IOException { try { delegate.close(); } finally { out.close(); } } } /** * {@code TupleEntry} deserializer implementation. */ public static class TupleEntryDeserializer implements Deserializer<TupleEntry> { private DataInputStream in; private final TupleDeserializer delegate; public TupleEntryDeserializer(TupleDeserializer tupleDeserializer) { this.delegate = tupleDeserializer; } @Override public void open(InputStream in) throws IOException { delegate.open(in); if(in instanceof DataInputStream) { this.in = (DataInputStream) in; } else { this.in = new DataInputStream(in); } } @Override public TupleEntry deserialize(TupleEntry t) throws IOException { ObjectInputStream input = new ObjectInputStream(in); // Creating a new stream each call seems to be required TupleEntry tupleEntry = null; try { Fields fields = (Fields) input.readObject(); Tuple tuple = delegate.deserialize(null); tupleEntry = new TupleEntry(fields, tuple); } catch(ClassNotFoundException e) { Throwables.propagate(e); } return tupleEntry; } @Override public void close() throws IOException { try { delegate.close(); } finally { in.close(); } } } public static class TupleEntryComparator implements StreamComparator<BufferedInputStream>, Comparator<TupleEntry>, Serializable { @Override public int compare(TupleEntry lhs, TupleEntry rhs) { if(lhs == null) { return -1; } if(rhs == null) { return 1; } return 0; } @Override public int compare(BufferedInputStream lhsStream, BufferedInputStream rhsStream) { try { if(lhsStream == null && rhsStream == null) { return 0; } if(lhsStream == null) { return -1; } if(rhsStream == null) { return 1; } String lhsString = WritableUtils.readString(new DataInputStream(lhsStream)); String rhsString = WritableUtils.readString(new DataInputStream(rhsStream)); return lhsString.compareTo(rhsString); } catch(IOException exception) { throw new CascadingException(exception); } } } } This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,92 @@ import static org.fest.assertions.api.Assertions.assertThat; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.hadoop.mapred.JobConf; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import cascading.CascadingTestCase; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.hadoop.TupleSerialization; import cascading.tuple.hadoop.io.HadoopTupleInputStream; import cascading.tuple.hadoop.io.HadoopTupleOutputStream; import cascading.tuple.io.TupleInputStream; import cascading.tuple.io.TupleOutputStream; /** * Tests {@code TupleEntrySerialization}. */ public class TupleEntrySerializationTest extends CascadingTestCase { @Rule TemporaryFolder tmp = new TemporaryFolder(); @Test public void test_tuple_entry_serialization() throws IOException { // Setup TupleSerialization tupleSerialization = new TupleSerialization(createJobConf()); File file = tmp.newFile(); // @formatter:off TupleEntry testEntry = new TupleEntry( new Fields("x", "y", "z"), new Tuple( new TupleEntry( new Fields("n1", "n2"), new Tuple(4, 5) ), // x 2, // y 3 // z ) ); // @formatter:on // Output tuple state serialization Tuple outputTuple = writeTuple(tupleSerialization, file, testEntry); // Input for tuple state serialization Tuple inputTuple = readTuple(tupleSerialization, file); // Check the first item in the tuple is a TupleEntry object assertThat(inputTuple.getObject(0)).isInstanceOf(TupleEntry.class); assertThat(inputTuple.toString()).isEqualTo(outputTuple.toString()); } private Tuple writeTuple(TupleSerialization tupleSerialization, File file, TupleEntry tupleEntry) throws FileNotFoundException, IOException { TupleOutputStream output = new HadoopTupleOutputStream(new FileOutputStream(file, false), tupleSerialization.getElementWriter()); Tuple outputTuple = new Tuple(tupleEntry); output.writeTuple(outputTuple); output.close(); return outputTuple; } private Tuple readTuple(TupleSerialization tupleSerialization, File file) throws FileNotFoundException, IOException { TupleInputStream input = new HadoopTupleInputStream(new FileInputStream(file), tupleSerialization.getElementReader()); Tuple inputTuple = input.readTuple(); input.close(); return inputTuple; } private JobConf createJobConf() { JobConf jobConf = new JobConf(); jobConf.set("io.serializations", TupleEntrySerialization.class.getName()); return jobConf; } }