Created
March 11, 2013 11:55
-
-
Save ashwanthkumar/5133733 to your computer and use it in GitHub Desktop.
A MapReduce InputFormat for HBase's HFile.
- Tested on HBase 0.94.2 and Hadoop 1.0.1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.io.hfile.CacheConfig; | |
import org.apache.hadoop.hbase.io.hfile.HFile; | |
import org.apache.hadoop.hbase.io.hfile.HFileScanner; | |
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import java.io.IOException; | |
/** | |
* A MapReduce InputFormat for HBase's HFile. | |
*/ | |
class HFileInputFormat extends FileInputFormat<ImmutableBytesWritable, KeyValue> { | |
@Override | |
public RecordReader<ImmutableBytesWritable, KeyValue> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { | |
return new HFileRecordReader(); | |
} | |
private class HFileRecordReader extends RecordReader<ImmutableBytesWritable, KeyValue> { | |
HFile.Reader reader; | |
HFileScanner scanner; | |
Integer entryNumber = 0; | |
@Override | |
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { | |
SchemaMetrics.configureGlobally(context.getConfiguration()); | |
Path path = ((FileSplit) split).getPath(); | |
FileSystem fs = org.apache.hadoop.fs.FileSystem.get(context.getConfiguration()); | |
reader = HFile.createReader(fs, path, new CacheConfig(context.getConfiguration())); | |
scanner = reader.getScanner(false, false); | |
reader.loadFileInfo(); | |
} | |
@Override | |
public boolean nextKeyValue() throws IOException, InterruptedException { | |
entryNumber += 1; | |
if (!scanner.isSeeked()) | |
// Had to move this here because "nextKeyValue" is called before the first getCurrentKey | |
// which was causing us to miss the first row of the HFile. | |
return scanner.seekTo(); | |
else { | |
return scanner.next(); | |
} | |
} | |
@Override | |
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { | |
return new ImmutableBytesWritable(scanner.getKeyValue().getRow()); | |
} | |
@Override | |
public KeyValue getCurrentValue() throws IOException, InterruptedException { | |
return scanner.getKeyValue(); | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
return (entryNumber / (float) reader.getEntries()); | |
} | |
@Override | |
public void close() throws IOException { | |
if (reader != null) { | |
reader.close(); | |
} | |
} | |
} | |
} |
test with hbase client 1.2.6 and server 1.2.0-cdh5.16.2
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
add default constructor: