-
-
Save risarora/0605a7afcfc29b6d4339 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************* | |
Gist | |
************************* | |
One more gist related to controlling the number of mappers in a mapreduce task. | |
Background on Inputsplits | |
-------------------------- | |
An inputsplit is a chunk of the input data allocated to a map task for processing. FileInputFormat | |
generates inputsplits (and divides the same into records) - one inputsplit for each file, unless the | |
file spans more than a HDFS block at which point it factors in the configured values of minimum split | |
size, maximimum split size and block size in determining the split size. | |
Here's the formula, from Hadoop the definitive guide- | |
Split size = max( minimumSplitSize, min( maximumSplitSize, HDFSBlockSize)) | |
So, if we go with the default values, the split size = HDFSBlockSize for files spanning more than an | |
HDFS block. | |
Problem with mapreduce processing of small files | |
------------------------------------------------- | |
We all know that Hadoop works best with large files; But the reality is that we still have to deal | |
with small files. When you want to process many small files in a mapreduce job, by default, each file | |
is processed by a map task (So, 1000 small files = 1000 map tasks). Having too many tasks that | |
finish in a matter of seconds is inefficient. | |
Increasing the minimum split size, to reduce the number of map tasks, to handle such a situation, is | |
not the right solution as it will be at the potential cost of locality. | |
Solution | |
--------- | |
CombineFileInputFormat packs many files into a split, providing more data for a map task to process. | |
It factors in node and rack locality so performance is not compromised. | |
Sample program | |
--------------- | |
The sample program demonstrates that using CombineFileInput, we can process multiple small files (each file | |
with size less than HDFS block size), in a single map task. | |
Old API | |
-------- | |
The new API in the version of Hadoop I am running does not include CombineFileInput. | |
Will write another gist with the program using new API, shortly. | |
Key aspects of the program | |
---------------------------- | |
1. CombineFileInputFormat is an abstract class; We have to create a subclass that extends it, and | |
implement the getRecordReader method. This implementation is in the class -ExtendedCombineFileInputFormat.java | |
(courtesy - http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205) | |
2. In the driver, set the value of mapred.max.split.size | |
3. In the driver, set the input format to the subclass of CombineFileInputFormat |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Data and code download | |
******************************* | |
Data and code: | |
-------------- | |
gitHub: | |
<<To be added>> | |
Email me at [email protected] if you encounter any issues | |
Directory structure | |
------------------- | |
formatProject | |
data | |
employees_partFiles | |
employees_part1 | |
employees_part2 | |
employees_part3 | |
employees_part4 | |
employees_part5 | |
formatCombineFileInputFormat | |
src | |
MapperCombineFileInputFormat.java | |
DriverCombineFileInputFormat.java | |
ExtendedCombineFileInputFormat.java | |
jar | |
formatCombineFileInputFormatOAPI.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
Data Structure | |
******************************* | |
[EmpNo DOB FName LName HireDate DeptNo] | |
10001 1953-09-02 Georgi Facello M 1986-06-26 d005 | |
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007 | |
10003 1959-12-03 Parto Bamford M 1986-08-28 d004 | |
....... | |
....... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
Expected Results | |
******************************* | |
Key goal of demonstration: Process 5 small files in one map task | |
Emit a subset of the input dataset. | |
[EmpNo FName LName] | |
10001 Georgi Facello | |
10002 Bezalel Simmel | |
10003 Parto Bamford |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*File: MapperCombineFileInputFormat.java | |
*Usage: Mapper | |
********************************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.MapReduceBase; | |
import org.apache.hadoop.mapred.Mapper; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.Reporter; | |
public class MapperCombineFileInputFormat extends MapReduceBase implements | |
Mapper<LongWritable, Text, Text, Text> { | |
Text txtKey = new Text(""); | |
Text txtValue = new Text(""); | |
@Override | |
public void map(LongWritable key, Text value, | |
OutputCollector<Text, Text> output, Reporter reporter) | |
throws IOException { | |
if (value.toString().length() > 0) { | |
String[] arrEmpAttributes = value.toString().split("\\t"); | |
txtKey.set(arrEmpAttributes[0].toString()); | |
txtValue.set(arrEmpAttributes[2].toString() + "\t" | |
+ arrEmpAttributes[3].toString()); | |
output.collect(txtKey, txtValue); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*File: DriverCombineFileInputFormat.java | |
*Usage: Driver | |
********************************************/ | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.JobClient; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.RunningJob; | |
import org.apache.hadoop.mapred.TextOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
public class DriverCombineFileInputFormat { | |
public static void main(String[] args) throws Exception { | |
JobConf conf = new JobConf("DriverCombineFileInputFormat"); | |
conf.set("mapred.max.split.size", "134217728");//128 MB | |
conf.setJarByClass(DriverCombineFileInputFormat.class); | |
String[] jobArgs = new GenericOptionsParser(conf, args) | |
.getRemainingArgs(); | |
conf.setMapperClass(MapperCombineFileInputFormat.class); | |
conf.setInputFormat(ExtendedCombineFileInputFormat.class); | |
ExtendedCombineFileInputFormat.addInputPath(conf, new Path(jobArgs[0])); | |
conf.setNumReduceTasks(0); | |
conf.setOutputFormat(TextOutputFormat.class); | |
TextOutputFormat.setOutputPath(conf, new Path(jobArgs[1])); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(Text.class); | |
RunningJob job = JobClient.runJob(conf); | |
while (!job.isComplete()) { | |
Thread.sleep(1000); | |
} | |
System.exit(job.isSuccessful() ? 0 : 2); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*File: ExtendedCombineFileInputFormat.java | |
*Usage: Sub-class implementation of abstract | |
class CombineFileInputFormat | |
********************************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.mapred.InputSplit; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.LineRecordReader; | |
import org.apache.hadoop.mapred.RecordReader; | |
import org.apache.hadoop.mapred.Reporter; | |
import org.apache.hadoop.mapred.lib.CombineFileInputFormat; | |
import org.apache.hadoop.mapred.lib.CombineFileRecordReader; | |
import org.apache.hadoop.mapred.lib.CombineFileSplit; | |
@SuppressWarnings("deprecation") | |
public class ExtendedCombineFileInputFormat extends | |
CombineFileInputFormat<LongWritable, Text> { | |
@SuppressWarnings({ "unchecked", "rawtypes" }) | |
@Override | |
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, | |
JobConf conf, Reporter reporter) throws IOException { | |
return new CombineFileRecordReader(conf, (CombineFileSplit) split, | |
reporter, (Class) myCombineFileRecordReader.class); | |
} | |
public static class myCombineFileRecordReader implements | |
RecordReader<LongWritable, Text> { | |
private final LineRecordReader linerecord; | |
public myCombineFileRecordReader(CombineFileSplit split, | |
Configuration conf, Reporter reporter, Integer index) | |
throws IOException { | |
FileSplit filesplit = new FileSplit(split.getPath(index), | |
split.getOffset(index), split.getLength(index), | |
split.getLocations()); | |
linerecord = new LineRecordReader(conf, filesplit); | |
} | |
@Override | |
public void close() throws IOException { | |
linerecord.close(); | |
} | |
@Override | |
public LongWritable createKey() { | |
// TODO Auto-generated method stub | |
return linerecord.createKey(); | |
} | |
@Override | |
public Text createValue() { | |
// TODO Auto-generated method stub | |
return linerecord.createValue(); | |
} | |
@Override | |
public long getPos() throws IOException { | |
// TODO Auto-generated method stub | |
return linerecord.getPos(); | |
} | |
@Override | |
public float getProgress() throws IOException { | |
// TODO Auto-generated method stub | |
return linerecord.getProgress(); | |
} | |
@Override | |
public boolean next(LongWritable key, Text value) throws IOException { | |
// TODO Auto-generated method stub | |
return linerecord.next(key, value); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
***************************** | |
*HDFS command to load data | |
***************************** | |
hadoop fs -mkdir formatProject | |
hadoop fs -put formatProject/data formatProject/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
***************************** | |
*Run program | |
***************************** | |
hadoop jar ~/Blog/formatProject/formatCombineFileInputFormat/jar/formatCombineFileInputFormatOAPI.jar DriverCombineFileInputFormat /user/akhanolk/formatProject/data/employees_partFiles /user/akhanolk/formatProject/output/output-CombineFileInputFormat |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
***************************** | |
*Results | |
***************************** | |
.... | |
13/09/22 17:16:31 INFO mapred.JobClient: Launched map tasks=1 | |
13/09/22 17:16:31 INFO mapred.JobClient: Data-local map tasks=1 | |
13/09/22 17:16:31 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=17885 | |
... | |
$ hadoop fs -ls -R formatProject/output/output-CombineFileInputFormat/part* | awk '{print $8}' | |
formatProject/output/output-CombineFileInputFormat/part-00000 | |
$ hadoop fs -cat formatProject/output/output-CombineFileInputFormat/part-00000 | |
10001 Georgi Facello | |
10002 Bezalel Simmel | |
10003 Parto Bamford | |
10004 Chirstian Koblick | |
10005 Kyoichi Maliniak | |
..... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
References | |
************************** | |
Apache documentation: | |
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/lib/CombineFileInputFormat.html | |
Concepts: | |
Hadoop the Definitive Guide | |
Code: | |
http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205 | |
Data: | |
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment