Created
January 3, 2019 21:58
-
-
Save kennycason/7bdb418eb54a824721180811c292ddfd to your computer and use it in GitHub Desktop.
Hadoop IO - BinaryFileInputFormat
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kennycason.hadoop.io; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
/** An {@link InputFormat} for reading binary data (byte[]) */ | |
public class BinaryFileInputFormat extends FileInputFormat<NullWritable, ImmutableBytesWritable> { | |
@Override | |
public RecordReader<NullWritable, ImmutableBytesWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) { | |
return new BinaryFileRecordReader(); | |
} | |
@Override | |
protected boolean isSplitable(final JobContext context, final Path file) { | |
return false; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BinaryFileMapReduceJob { | |
private static final long INPUT_MAX_SPLIT_SIZE = (long) 1024 * 1024 * 1024; | |
/** initialize m/r job code emitted **/ | |
private void setupInputParameters(final Job jobInstance) throws IOException { | |
jobInstance.setInputFormatClass(CombineBinaryFileInputFormat.class); | |
FileInputFormat.addInputPath(jobInstance, new Path("/user/kenny/binary_files/"); | |
jobInstance.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", INPUT_MAX_SPLIT_SIZE); | |
jobInstance.setMapperClass(BinaryFileMapReduceMapper.class); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BinaryFileMapReduceMapper extends Mapper<Writable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> { | |
@Override | |
protected void setup(final Context context) throws IOException, InterruptedException {} | |
@Override | |
protected void map(final Writable ignoredKey, | |
final ImmutableBytesWritable value, | |
final Context context) throws IOException, InterruptedException { | |
// do stuff | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kennycason.hadoop.io; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import java.io.IOException; | |
public class BinaryFileRecordReader extends RecordReader<NullWritable, ImmutableBytesWritable> { | |
private FSDataInputStream fsDataInputStream; | |
private byte[] fileBytes; | |
private int fileBytesLength; | |
private boolean finishedReadingBytes; | |
@Override | |
public void initialize(final InputSplit inputSplit, final TaskAttemptContext context) throws IOException { | |
if (!(inputSplit instanceof FileSplit)) { | |
throw new IllegalArgumentException("Only FileSplits are allowed, found: " + inputSplit.getClass()); | |
} | |
final FileSplit fileSplit = (FileSplit) inputSplit; | |
fsDataInputStream = fileSplit.getPath() | |
.getFileSystem(context.getConfiguration()) | |
.open(fileSplit.getPath()); | |
fileBytesLength = (int) fileSplit.getLength(); | |
fileBytes = new byte[fileBytesLength]; | |
} | |
@Override | |
public boolean nextKeyValue() throws IOException { | |
if (finishedReadingBytes) { | |
return false; | |
} | |
fsDataInputStream.readFully(fileBytes); | |
finishedReadingBytes = true; | |
return true; | |
} | |
@Override | |
public NullWritable getCurrentKey() { | |
return NullWritable.get(); | |
} | |
@Override | |
public ImmutableBytesWritable getCurrentValue() { | |
return new ImmutableBytesWritable(fileBytes); | |
} | |
@Override | |
public float getProgress() { | |
return finishedReadingBytes ? 1.0f : 0.0f; | |
} | |
@Override | |
public void close() throws IOException { | |
fsDataInputStream.close(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kennycason.hadoop.io; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.*; | |
import java.io.IOException; | |
/** | |
* Input format that is a <code>CombineFileInputFormat</code>-equivalent for | |
* <code>BinaryFileInputFormat</code>. | |
* | |
* @see CombineFileInputFormat | |
*/ | |
public class CombineBinaryFileInputFormat extends CombineFileInputFormat<NullWritable, ImmutableBytesWritable> { | |
@Override | |
public RecordReader<NullWritable, ImmutableBytesWritable> createRecordReader(final InputSplit split, | |
final TaskAttemptContext context) throws IOException { | |
return new CombineFileRecordReader((CombineFileSplit)split, context, BinaryFileRecordReaderWrapper.class); | |
} | |
/** | |
* A record reader that may be passed to <code>CombineFileRecordReader</code> | |
* so that it can be used in a <code>CombineFileInputFormat</code>-equivalent | |
* for <code>BinaryFileInputFormat</code>. | |
* | |
* @see CombineFileRecordReader | |
* @see CombineFileInputFormat | |
* @see BinaryFileInputFormat | |
*/ | |
private static class BinaryFileRecordReaderWrapper extends CombineFileRecordReaderWrapper<NullWritable, ImmutableBytesWritable> { | |
public BinaryFileRecordReaderWrapper (final CombineFileSplit split, | |
final TaskAttemptContext context, | |
final Integer idx) throws IOException, InterruptedException { | |
super(new BinaryFileInputFormat(), split, context, idx); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment