Forked from airawat/00-SecondarySortJavaMapReduce
Last active
August 29, 2015 14:09
-
-
Save risarora/0f1003f46d11ddc645d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Secondary sort in Mapreduce | |
With mapreduce framework, the keys are sorted but the values associated with each key | |
are not. In order for the values to be sorted, we need to write code to perform what is | |
referred to a secondary sort. The sample code in this gist demonstrates such a sort. | |
The input to the program is a bunch of employee attributes. | |
The output required is department number (deptNo) in ascending order, and the employee last name, | |
first name and employee ID in descending order. | |
The recipe to get the effect of sorting by value is: | |
1) Make the key a composite of the natural key (deptNo) and the natural value (lName, fName and empNo). | |
2) The sort comparator should order by the composite key, that is, the natural key and natural | |
value. | |
3) The partitioner and grouping comparator for the composite key should consider only the natural | |
key for partitioning and grouping. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Data and code download | |
******************************* | |
Data and code: | |
-------------- | |
gitHub: | |
<<To be added>> | |
Email me at [email protected] if you encounter any issues | |
Directory structure | |
------------------- | |
sortProject | |
data | |
employees_tsv | |
employees_tsv | |
SecondarySortBasic | |
src | |
CompositeKeyWritable.java | |
SecondarySortBasicMapper.java | |
SecondarySortBasicPartitioner.java | |
SecondarySortBasicCompKeySortComparator.java | |
SecondarySortBasicGroupingComparator.java | |
SecondarySortBasicReducer.java | |
SecondarySortBasicDriver.java | |
jar | |
SecondarySortBasic.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Sample Data | |
******************************* | |
EmpID DOB FName LName Gender Hire date DeptID | |
10003 1959-12-03 Parto Bamford M 1986-08-28 d004 | |
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004 | |
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 | |
.... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Expected results | |
******************************* | |
Sort order: [DeptID asc, {LName,FName,EmpID} desc] | |
DeptID LName FName EmpID | |
d001 Zykh Sudhanshu 205927 | |
d001 Zykh Nidapan 452738 | |
.. | |
d001 Yoshimura Alenka 463297 | |
d001 Yeung Yuguang 483161 | |
.. | |
d001 Acton Basim 105207 | |
d001 Aamodt Sreekrishna 493601 | |
.. | |
d002 Aamodt Yakkov 43290 | |
.. | |
d003 Acton Idoia 211583 | |
.. | |
d004 dAstous Candido 59201 | |
d004 dAstous Berhard 427930 | |
.. | |
d005 Zizka Aamer 409151 | |
d005 Zirintsis Xiaoqiang 52246 | |
.... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*CustomWritable for the composite key: CompositeKeyWritable | |
****************************************************************/ | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableUtils; | |
/** | |
* | |
* @author akhanolkar | |
* | |
* Purpose: A custom writable with two attributes- deptNo and | |
* NameEmpIDPair; | |
*/ | |
public class CompositeKeyWritable implements Writable, | |
WritableComparable<CompositeKeyWritable> { | |
private String deptNo; | |
private String lNameEmpIDPair; | |
public CompositeKeyWritable() { | |
} | |
public CompositeKeyWritable(String deptNo, String lNameEmpIDPair) { | |
this.deptNo = deptNo; | |
this.lNameEmpIDPair = lNameEmpIDPair; | |
} | |
@Override | |
public String toString() { | |
return (new StringBuilder().append(deptNo).append("\t") | |
.append(lNameEmpIDPair)).toString(); | |
} | |
public void readFields(DataInput dataInput) throws IOException { | |
deptNo = WritableUtils.readString(dataInput); | |
lNameEmpIDPair = WritableUtils.readString(dataInput); | |
} | |
public void write(DataOutput dataOutput) throws IOException { | |
WritableUtils.writeString(dataOutput, deptNo); | |
WritableUtils.writeString(dataOutput, lNameEmpIDPair); | |
} | |
public int compareTo(CompositeKeyWritable objKeyPair) { | |
// TODO: | |
/* | |
* Note: This code will work as it stands; but when CompositeKeyWritable | |
* is used as key in a map-reduce program, it is de-serialized into an | |
* object for comapareTo() method to be invoked; | |
* | |
* To do: To optimize for speed, implement a raw comparator - will | |
* support comparison of serialized representations | |
*/ | |
int result = deptNo.compareTo(objKeyPair.deptNo); | |
if (0 == result) { | |
result = lNameEmpIDPair.compareTo(objKeyPair.lNameEmpIDPair); | |
} | |
return result; | |
} | |
public String getDeptNo() { | |
return deptNo; | |
} | |
public void setDeptNo(String deptNo) { | |
this.deptNo = deptNo; | |
} | |
public String getLNameEmpIDPair() { | |
return lNameEmpIDPair; | |
} | |
public void setLNameEmpIDPair(String lNameEmpIDPair) { | |
this.lNameEmpIDPair = lNameEmpIDPair; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*Mapper: SecondarySortBasicMapper | |
***************************************************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class SecondarySortBasicMapper extends | |
Mapper<LongWritable, Text, CompositeKeyWritable, NullWritable> { | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
if (value.toString().length() > 0) { | |
String arrEmpAttributes[] = value.toString().split("\\t"); | |
context.write( | |
new CompositeKeyWritable( | |
arrEmpAttributes[6].toString(), | |
(arrEmpAttributes[3].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0] | |
.toString())), NullWritable.get()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*Partitioner: SecondarySortBasicPartitioner | |
***************************************************************/ | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
public class SecondarySortBasicPartitioner extends | |
Partitioner<CompositeKeyWritable, NullWritable> { | |
@Override | |
public int getPartition(CompositeKeyWritable key, NullWritable value, | |
int numReduceTasks) { | |
return (key.getDeptNo().hashCode() % numReduceTasks); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*SortComparator: SecondarySortBasicCompKeySortComparator | |
*****************************************************************/ | |
import org.apache.hadoop.io.WritableComparator; | |
public class SecondarySortBasicCompKeySortComparator extends WritableComparator { | |
protected SecondarySortBasicCompKeySortComparator() { | |
super(CompositeKeyWritable.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
CompositeKeyWritable key1 = (CompositeKeyWritable) w1; | |
CompositeKeyWritable key2 = (CompositeKeyWritable) w2; | |
int cmpResult = key1.getDeptNo().compareTo(key2.getDeptNo()); | |
if (cmpResult == 0)// same deptNo | |
{ | |
return -key1.getLNameEmpIDPair() | |
.compareTo(key2.getLNameEmpIDPair()); | |
//If the minus is taken out, the values will be in | |
//ascending order | |
} | |
return cmpResult; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************************************************** | |
*GroupingComparator: SecondarySortBasicGroupingComparator | |
*************************************************************** | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
public class SecondarySortBasicGroupingComparator extends WritableComparator { | |
protected SecondarySortBasicGroupingComparator() { | |
super(CompositeKeyWritable.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
CompositeKeyWritable key1 = (CompositeKeyWritable) w1; | |
CompositeKeyWritable key2 = (CompositeKeyWritable) w2; | |
return key1.getDeptNo().compareTo(key2.getDeptNo()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************************** | |
*Reducer: SecondarySortBasicReducer | |
*************************************** | |
import java.io.IOException; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Reducer; | |
public class SecondarySortBasicReducer | |
extends | |
Reducer<CompositeKeyWritable, NullWritable, CompositeKeyWritable, NullWritable> { | |
@Override | |
public void reduce(CompositeKeyWritable key, Iterable<NullWritable> values, | |
Context context) throws IOException, InterruptedException { | |
for (NullWritable value : values) { | |
context.write(key, NullWritable.get()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************************** | |
*Driver: SecondarySortBasicDriver | |
*************************************** | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class SecondarySortBasicDriver extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required for SecondarySortBasicDriver- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
job.setJobName("Secondary sort example"); | |
job.setJarByClass(SecondarySortBasicDriver.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(SecondarySortBasicMapper.class); | |
job.setMapOutputKeyClass(CompositeKeyWritable.class); | |
job.setMapOutputValueClass(NullWritable.class); | |
job.setPartitionerClass(SecondarySortBasicPartitioner.class); | |
job.setSortComparatorClass(SecondarySortBasicCompKeySortComparator.class); | |
job.setGroupingComparatorClass(SecondarySortBasicGroupingComparator.class); | |
job.setReducerClass(SecondarySortBasicReducer.class); | |
job.setOutputKeyClass(CompositeKeyWritable.class); | |
job.setOutputValueClass(NullWritable.class); | |
job.setNumReduceTasks(8); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new SecondarySortBasicDriver(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Command to run the program | |
******************************* | |
hadoop jar ~/Blog/sortProject/secondarySortBasic/jar/secondarySortBasic.jar SecondarySortBasicDriver /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/sortProject/data/output-secondarySortBasic |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Results | |
******************************* | |
--Source record count | |
hadoop fs -cat sortProject/data/employees/employees_tsv | wc -l | |
2246830 | |
--Results record count | |
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | wc -l | |
2246830 | |
--Files generated | |
hadoop fs -ls -R sortProject/data/output-secondarySortBasic/part* | awk '{print $8}' | |
sortProject/data/output-secondarySortBasic/part-r-00000 | |
sortProject/data/output-secondarySortBasic/part-r-00001 | |
sortProject/data/output-secondarySortBasic/part-r-00002 | |
sortProject/data/output-secondarySortBasic/part-r-00003 | |
sortProject/data/output-secondarySortBasic/part-r-00004 | |
sortProject/data/output-secondarySortBasic/part-r-00005 | |
sortProject/data/output-secondarySortBasic/part-r-00006 | |
sortProject/data/output-secondarySortBasic/part-r-00007 | |
--Output | |
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | |
d001 Zykh Sudhanshu 205927 | |
d001 Zykh Nidapan 452738 | |
.. | |
d001 Yoshimura Alenka 463297 | |
d001 Yeung Yuguang 483161 | |
.. | |
d001 Acton Basim 105207 | |
d001 Aamodt Sreekrishna 493601 | |
.. | |
d002 Aamodt Yakkov 43290 | |
.. | |
d003 Acton Idoia 211583 | |
.. | |
d004 dAstous Candido 59201 | |
d004 dAstous Berhard 427930 | |
.. | |
d005 Zizka Aamer 409151 | |
d005 Zirintsis Xiaoqiang 52246 | |
.... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
********************** | |
Reference: | |
********************** | |
Hadoop the definitive guide, 3rd edition | |
********************** | |
Credits: | |
********************** | |
Data from mysql - http://dev.mysql.com/doc/employee/en.index.html |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment