Last active
March 28, 2016 13:30
-
-
Save HappyStorm/65a2ea1405d9fa6fe17c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hadoop.examples; | |
import java.util.List; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.Comparator; | |
import java.io.IOException; | |
import java.util.StringTokenizer; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
public class WordCount { | |
public static class TopTenMapper | |
extends Mapper<NullWritable, Text, NullWritable, IntWritable>{ | |
private List<IntWritable> list = new ArrayList<IntWritable>(); | |
public void map(NullWritable key, Text value, Context context | |
) throws IOException, InterruptedException { | |
// transform the string to integer, then add it to the list | |
StringTokenizer itr = new StringTokenizer(value.toString(), "\t"); | |
while(itr.hasMoreTokens()) { | |
IntWritable num = new IntWritable(Integer.parseInt(itr.nextToken())); | |
list.add(num); | |
} | |
// sort the list by descending order | |
Collections.sort(list, new Comparator<IntWritable>() { | |
public int compare(IntWritable a, IntWritable b) { | |
return a.get() - b.get(); | |
} | |
}); | |
// remove the list element until the size equals to 10 | |
while(list.size()>10) | |
list.remove(list.size()-1); | |
} | |
protected void cleanup(Context context) | |
throws IOException, InterruptedException { | |
for(int i=0; i<10; ++i) | |
context.write(NullWritable.get(), list.get(i)); | |
} | |
} | |
public static class TopTenReducer | |
extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> { | |
private List<IntWritable> list = new ArrayList<IntWritable>(); | |
public void reduce(NullWritable key, Iterable<IntWritable> values, | |
Context context | |
) throws IOException, InterruptedException { | |
// add the values to the list | |
for (IntWritable val : values) | |
list.add(val); | |
// sort the list by descending order | |
Collections.sort(list, new Comparator<IntWritable>() { | |
public int compare(IntWritable a, IntWritable b) { | |
return a.get() - b.get(); | |
} | |
}); | |
// remove the list element until the size equals to 10 | |
while(list.size()>10) | |
list.remove(list.size()-1); | |
// sort the remaining list by ascending order | |
Collections.sort(list, new Comparator<IntWritable>() { | |
public int compare(IntWritable a, IntWritable b) { | |
return b.get() - a.get(); | |
} | |
}); | |
} | |
protected void cleanup(Context context) | |
throws IOException, InterruptedException { | |
for(int i=0; i<10; ++i) | |
context.write(NullWritable.get(), list.get(i)); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Configuration conf = new Configuration(); | |
Job job = new Job(conf, "Top 10 Sort"); | |
job.setJarByClass(WordCount.class); | |
job.setMapperClass(TopTenMapper.class); | |
job.setMapOutputKeyClass(NullWritable.class); | |
job.setMapOutputValueClass(IntWritable.class); | |
job.setReducerClass(TopTenReducer.class); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(IntWritable.class); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment