Created
January 11, 2013 15:40
-
-
Save btiernay/4511605 to your computer and use it in GitHub Desktop.
Custom TupleEntrySerialization implementation that complements the behavior of TupleSerialization. It works by (de)serializing the Fields field and then delegates to a supplied TupleSerialization instance for the Tuple field. Thus, it is possible to nest TupleEntries / Tuples into arbitrary trees and serialize them using Hadoop. This is an exten…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.DataInputStream; | |
import java.io.DataOutputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.ObjectInputStream; | |
import java.io.ObjectOutputStream; | |
import java.io.OutputStream; | |
import java.io.Serializable; | |
import java.util.Comparator; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.io.WritableUtils; | |
import org.apache.hadoop.io.serializer.Deserializer; | |
import org.apache.hadoop.io.serializer.Serialization; | |
import org.apache.hadoop.io.serializer.Serializer; | |
import cascading.CascadingException; | |
import cascading.tuple.Comparison; | |
import cascading.tuple.Fields; | |
import cascading.tuple.StreamComparator; | |
import cascading.tuple.Tuple; | |
import cascading.tuple.TupleEntry; | |
import cascading.tuple.hadoop.SerializationToken; | |
import cascading.tuple.hadoop.TupleSerialization; | |
import cascading.tuple.hadoop.io.BufferedInputStream; | |
import cascading.tuple.hadoop.io.TupleDeserializer; | |
import cascading.tuple.hadoop.io.TupleSerializer; | |
import com.google.common.base.Throwables; | |
/** | |
* Serialization implementation that is required for Hadoop {@link SequenceFile} serialization. | |
*/ | |
@SerializationToken(tokens = { 223 }, classNames = { "cascading.tuple.TupleEntry" }) | |
public class TupleEntrySerialization extends Configured implements Comparison<TupleEntry>, Serialization<TupleEntry> { | |
/** | |
* Delegate for recursively contained {@link Tuple} values. | |
*/ | |
private TupleSerialization tupleSerialization; | |
@Override | |
public void setConf(Configuration conf) { | |
super.setConf(conf); | |
this.tupleSerialization = new TupleSerialization(conf); | |
} | |
@Override | |
public boolean accept(Class<?> c) { | |
return TupleEntry.class.isAssignableFrom(c); | |
} | |
@Override | |
public Serializer<TupleEntry> getSerializer(Class<TupleEntry> c) { | |
final TupleSerializer tupleSerializer = (TupleSerializer) tupleSerialization.getSerializer(Tuple.class); | |
return new TupleEntrySerializer(tupleSerializer); | |
} | |
@Override | |
public Deserializer<TupleEntry> getDeserializer(Class<TupleEntry> c) { | |
final TupleDeserializer tupleDeserializer = (TupleDeserializer) tupleSerialization.getDeserializer(Tuple.class); | |
return new TupleEntryDeserializer(tupleDeserializer); | |
} | |
@Override | |
public Comparator<TupleEntry> getComparator(Class<TupleEntry> arg0) { | |
return new TupleEntryComparator(); | |
} | |
/** | |
* {@code TupleEntry} serializer implementation. | |
*/ | |
public static class TupleEntrySerializer implements Serializer<TupleEntry> { | |
private DataOutputStream out; | |
private final TupleSerializer delegate; | |
public TupleEntrySerializer(TupleSerializer tupleSerializer) { | |
this.delegate = tupleSerializer; | |
} | |
@Override | |
public void open(OutputStream out) throws IOException { | |
delegate.open(out); | |
if(out instanceof DataOutputStream) { | |
this.out = (DataOutputStream) out; | |
} else { | |
this.out = new DataOutputStream(out); | |
} | |
} | |
@Override | |
public void serialize(TupleEntry t) throws IOException { | |
ObjectOutputStream output = new ObjectOutputStream(out); // Creating a new stream each call seems to be required | |
output.writeObject(t.getFields()); | |
delegate.serialize(t.getTuple()); | |
} | |
@Override | |
public void close() throws IOException { | |
try { | |
delegate.close(); | |
} finally { | |
out.close(); | |
} | |
} | |
} | |
/** | |
* {@code TupleEntry} deserializer implementation. | |
*/ | |
public static class TupleEntryDeserializer implements Deserializer<TupleEntry> { | |
private DataInputStream in; | |
private final TupleDeserializer delegate; | |
public TupleEntryDeserializer(TupleDeserializer tupleDeserializer) { | |
this.delegate = tupleDeserializer; | |
} | |
@Override | |
public void open(InputStream in) throws IOException { | |
delegate.open(in); | |
if(in instanceof DataInputStream) { | |
this.in = (DataInputStream) in; | |
} else { | |
this.in = new DataInputStream(in); | |
} | |
} | |
@Override | |
public TupleEntry deserialize(TupleEntry t) throws IOException { | |
ObjectInputStream input = new ObjectInputStream(in); // Creating a new stream each call seems to be required | |
TupleEntry tupleEntry = null; | |
try { | |
Fields fields = (Fields) input.readObject(); | |
Tuple tuple = delegate.deserialize(null); | |
tupleEntry = new TupleEntry(fields, tuple); | |
} catch(ClassNotFoundException e) { | |
Throwables.propagate(e); | |
} | |
return tupleEntry; | |
} | |
@Override | |
public void close() throws IOException { | |
try { | |
delegate.close(); | |
} finally { | |
in.close(); | |
} | |
} | |
} | |
public static class TupleEntryComparator implements StreamComparator<BufferedInputStream>, Comparator<TupleEntry>, | |
Serializable { | |
@Override | |
public int compare(TupleEntry lhs, TupleEntry rhs) { | |
if(lhs == null) { | |
return -1; | |
} | |
if(rhs == null) { | |
return 1; | |
} | |
return 0; | |
} | |
@Override | |
public int compare(BufferedInputStream lhsStream, BufferedInputStream rhsStream) { | |
try { | |
if(lhsStream == null && rhsStream == null) { | |
return 0; | |
} | |
if(lhsStream == null) { | |
return -1; | |
} | |
if(rhsStream == null) { | |
return 1; | |
} | |
String lhsString = WritableUtils.readString(new DataInputStream(lhsStream)); | |
String rhsString = WritableUtils.readString(new DataInputStream(rhsStream)); | |
return lhsString.compareTo(rhsString); | |
} catch(IOException exception) { | |
throw new CascadingException(exception); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.fest.assertions.api.Assertions.assertThat; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.FileNotFoundException; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.rules.TemporaryFolder; | |
import cascading.CascadingTestCase; | |
import cascading.tuple.Fields; | |
import cascading.tuple.Tuple; | |
import cascading.tuple.TupleEntry; | |
import cascading.tuple.hadoop.TupleSerialization; | |
import cascading.tuple.hadoop.io.HadoopTupleInputStream; | |
import cascading.tuple.hadoop.io.HadoopTupleOutputStream; | |
import cascading.tuple.io.TupleInputStream; | |
import cascading.tuple.io.TupleOutputStream; | |
/** | |
* Tests {@code TupleEntrySerialization}. | |
*/ | |
public class TupleEntrySerializationTest extends CascadingTestCase { | |
@Rule | |
TemporaryFolder tmp = new TemporaryFolder(); | |
@Test | |
public void test_tuple_entry_serialization() throws IOException { | |
// Setup | |
TupleSerialization tupleSerialization = new TupleSerialization(createJobConf()); | |
File file = tmp.newFile(); | |
// @formatter:off | |
TupleEntry testEntry = | |
new TupleEntry( | |
new Fields("x", "y", "z"), | |
new Tuple( | |
new TupleEntry( | |
new Fields("n1", "n2"), | |
new Tuple(4, 5) | |
), // x | |
2, // y | |
3 // z | |
) | |
); | |
// @formatter:on | |
// Output tuple state serialization | |
Tuple outputTuple = writeTuple(tupleSerialization, file, testEntry); | |
// Input for tuple state serialization | |
Tuple inputTuple = readTuple(tupleSerialization, file); | |
// Check the first item in the tuple is a TupleEntry object | |
assertThat(inputTuple.getObject(0)).isInstanceOf(TupleEntry.class); | |
assertThat(inputTuple.toString()).isEqualTo(outputTuple.toString()); | |
} | |
private Tuple writeTuple(TupleSerialization tupleSerialization, File file, TupleEntry tupleEntry) | |
throws FileNotFoundException, IOException { | |
TupleOutputStream output = | |
new HadoopTupleOutputStream(new FileOutputStream(file, false), tupleSerialization.getElementWriter()); | |
Tuple outputTuple = new Tuple(tupleEntry); | |
output.writeTuple(outputTuple); | |
output.close(); | |
return outputTuple; | |
} | |
private Tuple readTuple(TupleSerialization tupleSerialization, File file) throws FileNotFoundException, IOException { | |
TupleInputStream input = | |
new HadoopTupleInputStream(new FileInputStream(file), tupleSerialization.getElementReader()); | |
Tuple inputTuple = input.readTuple(); | |
input.close(); | |
return inputTuple; | |
} | |
private JobConf createJobConf() { | |
JobConf jobConf = new JobConf(); | |
jobConf.set("io.serializations", TupleEntrySerialization.class.getName()); | |
return jobConf; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment