Created
February 2, 2011 17:32
-
-
Save sritchie/808035 to your computer and use it in GitHub Desktop.
Hadoop input format for swallowing entire files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package forma; | |
import forma.WholeFileInputFormat; | |
import cascading.scheme.Scheme; | |
import cascading.tap.Tap; | |
import cascading.tuple.Fields; | |
import cascading.tuple.Tuple; | |
import cascading.tuple.TupleEntry; | |
import java.io.IOException; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.OutputCollector; | |
public class WholeFile extends Scheme { | |
public WholeFile( Fields fields ) { | |
super(fields); | |
} | |
@Override | |
public void sourceInit( Tap tap, JobConf conf ) { | |
conf.setInputFormat( WholeFileInputFormat.class ); | |
} | |
@Override | |
public void sinkInit(Tap tap, JobConf conf) throws IOException { | |
throw new UnsupportedOperationException("Not supported yet."); | |
} | |
@Override | |
public Tuple source( Object key, Object value ) | |
{ | |
Tuple tuple = new Tuple(); | |
tuple.add(key.toString()); | |
tuple.add(value); | |
return tuple; | |
} | |
@Override | |
public void sink(TupleEntry te, OutputCollector oc) throws IOException { | |
throw new UnsupportedOperationException("Not supported yet."); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package forma; | |
import java.io.IOException; | |
import forma.WholeFileRecordReader; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.JobContext; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.fs.Path; | |
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { | |
@Override | |
protected boolean isSplitable(JobContext context, Path filename) { | |
return false; | |
} | |
@Override | |
public RecordReader<NullWritable, BytesWritable> createRecordReader( | |
InputSplit split, TaskAttemptContext context) { | |
return new WholeFileRecordReader(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package forma; | |
import java.io.IOException; | |
import forma.WholeFileRecordReader; | |
import org.apache.hadoop.mapred.RecordReader; | |
import org.apache.hadoop.mapred.InputSplit; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.Reporter; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.mapred.FileInputFormat; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { | |
@Override | |
protected boolean isSplitable(FileSystem fs, Path filename) { | |
return false; | |
} | |
@Override | |
public RecordReader<Text, BytesWritable> getRecordReader( | |
InputSplit split, JobConf job, Reporter reporter) throws IOException { | |
return new WholeFileRecordReader((FileSplit) split, job); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package forma; | |
import java.io.IOException; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IOUtils; | |
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { | |
private FileSplit fileSplit; | |
private Configuration conf; | |
private boolean processed = false; | |
private NullWritable key = NullWritable.get(); | |
private BytesWritable value = new BytesWritable(); | |
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { | |
this.fileSplit = (FileSplit) inputSplit; | |
this.conf = taskAttemptContext.getConfiguration(); | |
} | |
public boolean nextKeyValue() throws IOException { | |
if (!processed) { | |
byte[] contents = new byte[(int) fileSplit.getLength()]; | |
Path file = fileSplit.getPath(); | |
FileSystem fs = file.getFileSystem(conf); | |
FSDataInputStream in = null; | |
try { | |
in = fs.open(file); | |
IOUtils.readFully(in, contents, 0, contents.length); | |
value.set(contents, 0, contents.length); | |
} finally { | |
IOUtils.closeStream(in); | |
} | |
processed = true; | |
return true; | |
} | |
return false; | |
} | |
@Override | |
public NullWritable getCurrentKey() throws IOException, InterruptedException { | |
return key; | |
} | |
@Override | |
public BytesWritable getCurrentValue() throws IOException, InterruptedException { | |
return value; | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
return processed ? 1.0f : 0.0f; | |
} | |
@Override | |
public void close() throws IOException { | |
// do nothing | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package forma; | |
import java.io.IOException; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IOUtils; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.mapred.RecordReader; | |
class WholeFileRecordReader implements RecordReader<Text, BytesWritable> { | |
private FileSplit fileSplit; | |
private Configuration conf; | |
private boolean processed = false; | |
public WholeFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException { | |
this.fileSplit = fileSplit; | |
this.conf = conf; | |
} | |
@Override | |
public boolean next(Text key, BytesWritable value) throws IOException { | |
if (!processed) { | |
byte[] contents = new byte[(int) fileSplit.getLength()]; | |
Path file = fileSplit.getPath(); | |
String fileName = file.getName(); | |
key.set(fileName); | |
FileSystem fs = file.getFileSystem(conf); | |
FSDataInputStream in = null; | |
try { | |
in = fs.open(file); | |
IOUtils.readFully(in, contents, 0, contents.length); | |
value.set(contents, 0, contents.length); | |
} finally { | |
IOUtils.closeStream(in); | |
} | |
processed = true; | |
return true; | |
} | |
return false; | |
} | |
@Override | |
public Text createKey() { | |
return new Text(); | |
} | |
@Override | |
public BytesWritable createValue() { | |
return new BytesWritable(); | |
} | |
@Override | |
public long getPos() throws IOException { | |
return processed ? fileSplit.getLength() : 0; | |
} | |
@Override | |
public float getProgress() throws IOException { | |
return processed ? 1.0f : 0.0f; | |
} | |
@Override | |
public void close() throws IOException { | |
// do nothing | |
} | |
} |
org.apache.hadoop.mapred is the old interface , org.apache.hadoop.mapreduce is the new interface they are mismatch.
I've this error
Error: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
how can I fix this error?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello sritie, and thank you for sharing your code, i try to use it with Hadoop 2.2.0 and unfortunately i get the following error :
org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1396022683232_0016_m_000003_2 - exited : Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
Do you have an idea where the error could be from ?