Skip to content

Instantly share code, notes, and snippets.

@tf0054
Created March 22, 2012 14:48
Show Gist options
  • Save tf0054/2158742 to your computer and use it in GitHub Desktop.
Save tf0054/2158742 to your computer and use it in GitHub Desktop.
Hack#15
package org.hadoophacks.mapreduce.lib.input;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;
public class CombineFileLineRecordReader extends RecordReader<Text, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private int idx;
private String fileName;
@Override
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
CombineFileSplit split = (CombineFileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStartOffsets()[idx];
end = start + split.getLength();
Path file = split.getPath(idx);
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath(idx));
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
public CombineFileLineRecordReader(CombineFileSplit split , TaskAttemptContext context , Integer idx){
this.idx = idx;
this.fileName = split.getPath(idx).getName();
}
@Override
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new Text();
}
key.set(fileName + ":" + pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
@Override
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment