Created
February 20, 2012 05:56
-
-
Save jteso/1868049 to your computer and use it in GitHub Desktop.
How to read zip files from Map Reduce job -- Rolling your own input format (source:https://github.com/cotdp)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jteso.hadoop.contrib.inputformat; | |
import java.io.IOException; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
/** | |
* Parameters: | |
* <ul> | |
* <li>Text: fileName</li> | |
* <li>BytesWritable: file content</li> | |
* </ul> | |
* @author JTeso | |
* | |
*/ | |
public class ZipFileInputFormat extends FileInputFormat<Text,BytesWritable>{ | |
@Override | |
/** | |
* We tell hadoop not to split the zip file. | |
*/ | |
protected boolean isSplitable(org.apache.hadoop.mapreduce.JobContext ctx, Path filename){ | |
return false; | |
} | |
@Override | |
public RecordReader<Text,BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{ | |
return new ZipFileRecordReader(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jteso.hadoop.contrib.inputformat; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.util.zip.ZipEntry; | |
import java.util.zip.ZipInputStream; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
/** | |
* Used to iterate through records in a given split. | |
* In this particular case, it will be used to read a zip file. As the input will not be split. | |
* | |
* @see ZipFileInputFormat | |
* @author Jteso | |
* | |
*/ | |
public class ZipFileRecordReader extends RecordReader<Text,BytesWritable> | |
{ | |
private FSDataInputStream fsin; | |
private ZipInputStream zip; | |
private Text currentKey; | |
private BytesWritable currentValue; | |
private boolean isFinished = false; | |
@Override | |
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException | |
{ | |
Configuration conf = taskAttemptContext.getConfiguration(); | |
Path path = ((FileSplit) inputSplit).getPath(); | |
FileSystem fs = path.getFileSystem(conf); | |
fsin = fs.open(path); | |
zip = new ZipInputStream(fsin); | |
} | |
@Override | |
public boolean nextKeyValue() throws IOException, InterruptedException | |
{ | |
ZipEntry entry = zip.getNextEntry(); | |
if ( entry == null ) | |
{ | |
isFinished = true; | |
return false; | |
} | |
// Set the key | |
currentKey = new Text( entry.getName() ); | |
// Set the value | |
ByteArrayOutputStream bos = new ByteArrayOutputStream(); | |
byte[] temp = new byte[8192]; | |
while ( true ) | |
{ | |
int bytesRead = zip.read(temp, 0, 8192); | |
if ( bytesRead > 0 ) | |
bos.write(temp, 0, bytesRead); | |
else | |
break; | |
} | |
zip.closeEntry(); | |
currentValue = new BytesWritable( bos.toByteArray() ); | |
return true; | |
} | |
@Override | |
public float getProgress() throws IOException, InterruptedException | |
{ | |
return isFinished ? 1 : 0; | |
} | |
@Override | |
public Text getCurrentKey() throws IOException, InterruptedException | |
{ | |
return currentKey; | |
} | |
@Override | |
public BytesWritable getCurrentValue() throws IOException, InterruptedException | |
{ | |
return currentValue; | |
} | |
@Override | |
public void close() throws IOException | |
{ | |
try { zip.close(); } catch (Exception e) { } | |
try { fsin.close(); } catch (Exception e) { } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am getting below exception .My file size is very huge ...
2016-10-12 02:30:31,051 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at com.thomsonretuers.hbase.ZipFileRecordReader.nextKeyValue(ZipFileRecordReader.java:128)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)