Skip to content

Instantly share code, notes, and snippets.

@btiernay
Created January 11, 2013 15:40
Show Gist options
  • Save btiernay/4511605 to your computer and use it in GitHub Desktop.
Save btiernay/4511605 to your computer and use it in GitHub Desktop.
Custom TupleEntrySerialization implementation that complements the behavior of TupleSerialization. It works by (de)serializing the Fields field and then delegates to a supplied TupleSerialization instance for the Tuple field. Thus, it is possible to nest TupleEntries / Tuples into arbitrary trees and serialize them using Hadoop. This is an exten…
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
import cascading.CascadingException;
import cascading.tuple.Comparison;
import cascading.tuple.Fields;
import cascading.tuple.StreamComparator;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.hadoop.SerializationToken;
import cascading.tuple.hadoop.TupleSerialization;
import cascading.tuple.hadoop.io.BufferedInputStream;
import cascading.tuple.hadoop.io.TupleDeserializer;
import cascading.tuple.hadoop.io.TupleSerializer;
import com.google.common.base.Throwables;
/**
* Serialization implementation that is required for Hadoop {@link SequenceFile} serialization.
*/
@SerializationToken(tokens = { 223 }, classNames = { "cascading.tuple.TupleEntry" })
public class TupleEntrySerialization extends Configured implements Comparison<TupleEntry>, Serialization<TupleEntry> {
/**
* Delegate for recursively contained {@link Tuple} values.
*/
private TupleSerialization tupleSerialization;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
this.tupleSerialization = new TupleSerialization(conf);
}
@Override
public boolean accept(Class<?> c) {
return TupleEntry.class.isAssignableFrom(c);
}
@Override
public Serializer<TupleEntry> getSerializer(Class<TupleEntry> c) {
final TupleSerializer tupleSerializer = (TupleSerializer) tupleSerialization.getSerializer(Tuple.class);
return new TupleEntrySerializer(tupleSerializer);
}
@Override
public Deserializer<TupleEntry> getDeserializer(Class<TupleEntry> c) {
final TupleDeserializer tupleDeserializer = (TupleDeserializer) tupleSerialization.getDeserializer(Tuple.class);
return new TupleEntryDeserializer(tupleDeserializer);
}
@Override
public Comparator<TupleEntry> getComparator(Class<TupleEntry> arg0) {
return new TupleEntryComparator();
}
/**
* {@code TupleEntry} serializer implementation.
*/
public static class TupleEntrySerializer implements Serializer<TupleEntry> {
private DataOutputStream out;
private final TupleSerializer delegate;
public TupleEntrySerializer(TupleSerializer tupleSerializer) {
this.delegate = tupleSerializer;
}
@Override
public void open(OutputStream out) throws IOException {
delegate.open(out);
if(out instanceof DataOutputStream) {
this.out = (DataOutputStream) out;
} else {
this.out = new DataOutputStream(out);
}
}
@Override
public void serialize(TupleEntry t) throws IOException {
ObjectOutputStream output = new ObjectOutputStream(out); // Creating a new stream each call seems to be required
output.writeObject(t.getFields());
delegate.serialize(t.getTuple());
}
@Override
public void close() throws IOException {
try {
delegate.close();
} finally {
out.close();
}
}
}
/**
* {@code TupleEntry} deserializer implementation.
*/
public static class TupleEntryDeserializer implements Deserializer<TupleEntry> {
private DataInputStream in;
private final TupleDeserializer delegate;
public TupleEntryDeserializer(TupleDeserializer tupleDeserializer) {
this.delegate = tupleDeserializer;
}
@Override
public void open(InputStream in) throws IOException {
delegate.open(in);
if(in instanceof DataInputStream) {
this.in = (DataInputStream) in;
} else {
this.in = new DataInputStream(in);
}
}
@Override
public TupleEntry deserialize(TupleEntry t) throws IOException {
ObjectInputStream input = new ObjectInputStream(in); // Creating a new stream each call seems to be required
TupleEntry tupleEntry = null;
try {
Fields fields = (Fields) input.readObject();
Tuple tuple = delegate.deserialize(null);
tupleEntry = new TupleEntry(fields, tuple);
} catch(ClassNotFoundException e) {
Throwables.propagate(e);
}
return tupleEntry;
}
@Override
public void close() throws IOException {
try {
delegate.close();
} finally {
in.close();
}
}
}
public static class TupleEntryComparator implements StreamComparator<BufferedInputStream>, Comparator<TupleEntry>,
Serializable {
@Override
public int compare(TupleEntry lhs, TupleEntry rhs) {
if(lhs == null) {
return -1;
}
if(rhs == null) {
return 1;
}
return 0;
}
@Override
public int compare(BufferedInputStream lhsStream, BufferedInputStream rhsStream) {
try {
if(lhsStream == null && rhsStream == null) {
return 0;
}
if(lhsStream == null) {
return -1;
}
if(rhsStream == null) {
return 1;
}
String lhsString = WritableUtils.readString(new DataInputStream(lhsStream));
String rhsString = WritableUtils.readString(new DataInputStream(rhsStream));
return lhsString.compareTo(rhsString);
} catch(IOException exception) {
throw new CascadingException(exception);
}
}
}
}
import static org.fest.assertions.api.Assertions.assertThat;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import cascading.CascadingTestCase;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.hadoop.TupleSerialization;
import cascading.tuple.hadoop.io.HadoopTupleInputStream;
import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
import cascading.tuple.io.TupleInputStream;
import cascading.tuple.io.TupleOutputStream;
/**
* Tests {@code TupleEntrySerialization}.
*/
public class TupleEntrySerializationTest extends CascadingTestCase {
@Rule
TemporaryFolder tmp = new TemporaryFolder();
@Test
public void test_tuple_entry_serialization() throws IOException {
// Setup
TupleSerialization tupleSerialization = new TupleSerialization(createJobConf());
File file = tmp.newFile();
// @formatter:off
TupleEntry testEntry =
new TupleEntry(
new Fields("x", "y", "z"),
new Tuple(
new TupleEntry(
new Fields("n1", "n2"),
new Tuple(4, 5)
), // x
2, // y
3 // z
)
);
// @formatter:on
// Output tuple state serialization
Tuple outputTuple = writeTuple(tupleSerialization, file, testEntry);
// Input for tuple state serialization
Tuple inputTuple = readTuple(tupleSerialization, file);
// Check the first item in the tuple is a TupleEntry object
assertThat(inputTuple.getObject(0)).isInstanceOf(TupleEntry.class);
assertThat(inputTuple.toString()).isEqualTo(outputTuple.toString());
}
private Tuple writeTuple(TupleSerialization tupleSerialization, File file, TupleEntry tupleEntry)
throws FileNotFoundException, IOException {
TupleOutputStream output =
new HadoopTupleOutputStream(new FileOutputStream(file, false), tupleSerialization.getElementWriter());
Tuple outputTuple = new Tuple(tupleEntry);
output.writeTuple(outputTuple);
output.close();
return outputTuple;
}
private Tuple readTuple(TupleSerialization tupleSerialization, File file) throws FileNotFoundException, IOException {
TupleInputStream input =
new HadoopTupleInputStream(new FileInputStream(file), tupleSerialization.getElementReader());
Tuple inputTuple = input.readTuple();
input.close();
return inputTuple;
}
private JobConf createJobConf() {
JobConf jobConf = new JobConf();
jobConf.set("io.serializations", TupleEntrySerialization.class.getName());
return jobConf;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment