Skip to content

Instantly share code, notes, and snippets.

@shyiko
Created December 4, 2011 10:57
Show Gist options
  • Save shyiko/1429889 to your computer and use it in GitHub Desktop.
Save shyiko/1429889 to your computer and use it in GitHub Desktop.
Hadoop SequenceFileAppender
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
public abstract class SequenceFileAppender<K extends Writable, V extends Writable> {
private Configuration configuration;
private SequenceFile.CompressionType compressionType;
private Class<K> keyClass;
private Class<V> valueClass;
public SequenceFileAppender(Configuration configuration) {
this(configuration, null);
}
public Configuration getConfiguration() {
return configuration;
}
@SuppressWarnings({"unchecked"})
protected SequenceFileAppender(Configuration configuration, SequenceFile.CompressionType compressionType) {
this.configuration = configuration;
this.compressionType = compressionType;
keyClass = getGenericType(0);
valueClass = getGenericType(1);
}
private Class getGenericType(int index) {
Type superclass = getClass().getGenericSuperclass();
return (Class) ((ParameterizedType) superclass).getActualTypeArguments()[index];
}
public void append(Path target, Path... sources) throws IOException {
SequenceFile.Writer writer = getWriter(target);
try {
for (Path source : sources) {
K key = getKey(source);
for (V value : getValues(source)) {
writer.append(key, value);
}
}
} finally {
writer.close();
}
}
private SequenceFile.Writer getWriter(Path target) throws IOException {
FileSystem dfs = FileSystem.get(configuration);
if (compressionType == null) {
return SequenceFile.createWriter(dfs, configuration, target, keyClass, valueClass);
} else {
return SequenceFile.createWriter(dfs, configuration, target, keyClass, valueClass, compressionType);
}
}
protected abstract K getKey(Path path) throws IOException;
protected abstract List<V> getValues(Path path) throws IOException;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment