Skip to content

Instantly share code, notes, and snippets.

@jteso
Created February 20, 2012 05:56
Show Gist options
  • Save jteso/1868049 to your computer and use it in GitHub Desktop.
Save jteso/1868049 to your computer and use it in GitHub Desktop.
How to read zip files from Map Reduce job -- Rolling your own input format (source:https://github.com/cotdp)
package com.jteso.hadoop.contrib.inputformat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* Parameters:
* <ul>
* <li>Text: fileName</li>
* <li>BytesWritable: file content</li>
* </ul>
* @author JTeso
*
*/
public class ZipFileInputFormat extends FileInputFormat<Text,BytesWritable>{
@Override
/**
* We tell hadoop not to split the zip file.
*/
protected boolean isSplitable(org.apache.hadoop.mapreduce.JobContext ctx, Path filename){
return false;
}
@Override
public RecordReader<Text,BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
return new ZipFileRecordReader();
}
}
package com.jteso.hadoop.contrib.inputformat;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* Used to iterate through records in a given split.
* In this particular case, it will be used to read a zip file. As the input will not be split.
*
* @see ZipFileInputFormat
* @author Jteso
*
*/
public class ZipFileRecordReader extends RecordReader<Text,BytesWritable>
{
private FSDataInputStream fsin;
private ZipInputStream zip;
private Text currentKey;
private BytesWritable currentValue;
private boolean isFinished = false;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
Configuration conf = taskAttemptContext.getConfiguration();
Path path = ((FileSplit) inputSplit).getPath();
FileSystem fs = path.getFileSystem(conf);
fsin = fs.open(path);
zip = new ZipInputStream(fsin);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
ZipEntry entry = zip.getNextEntry();
if ( entry == null )
{
isFinished = true;
return false;
}
// Set the key
currentKey = new Text( entry.getName() );
// Set the value
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] temp = new byte[8192];
while ( true )
{
int bytesRead = zip.read(temp, 0, 8192);
if ( bytesRead > 0 )
bos.write(temp, 0, bytesRead);
else
break;
}
zip.closeEntry();
currentValue = new BytesWritable( bos.toByteArray() );
return true;
}
@Override
public float getProgress() throws IOException, InterruptedException
{
return isFinished ? 1 : 0;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException
{
return currentKey;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException
{
return currentValue;
}
@Override
public void close() throws IOException
{
try { zip.close(); } catch (Exception e) { }
try { fsin.close(); } catch (Exception e) { }
}
}
@SuThakur
Copy link

I am getting below exception .My file size is very huge ...
2016-10-12 02:30:31,051 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at com.thomsonretuers.hbase.ZipFileRecordReader.nextKeyValue(ZipFileRecordReader.java:128)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment