Skip to content

Instantly share code, notes, and snippets.

@HappyStorm
Last active March 28, 2016 13:30
Show Gist options
  • Save HappyStorm/65a2ea1405d9fa6fe17c to your computer and use it in GitHub Desktop.
Save HappyStorm/65a2ea1405d9fa6fe17c to your computer and use it in GitHub Desktop.
package org.apache.hadoop.examples;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TopTenMapper
extends Mapper<NullWritable, Text, NullWritable, IntWritable>{
private List<IntWritable> list = new ArrayList<IntWritable>();
public void map(NullWritable key, Text value, Context context
) throws IOException, InterruptedException {
// transform the string to integer, then add it to the list
StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
while(itr.hasMoreTokens()) {
IntWritable num = new IntWritable(Integer.parseInt(itr.nextToken()));
list.add(num);
}
// sort the list by descending order
Collections.sort(list, new Comparator<IntWritable>() {
public int compare(IntWritable a, IntWritable b) {
return a.get() - b.get();
}
});
// remove the list element until the size equals to 10
while(list.size()>10)
list.remove(list.size()-1);
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
for(int i=0; i<10; ++i)
context.write(NullWritable.get(), list.get(i));
}
}
public static class TopTenReducer
extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
private List<IntWritable> list = new ArrayList<IntWritable>();
public void reduce(NullWritable key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
// add the values to the list
for (IntWritable val : values)
list.add(val);
// sort the list by descending order
Collections.sort(list, new Comparator<IntWritable>() {
public int compare(IntWritable a, IntWritable b) {
return a.get() - b.get();
}
});
// remove the list element until the size equals to 10
while(list.size()>10)
list.remove(list.size()-1);
// sort the remaining list by ascending order
Collections.sort(list, new Comparator<IntWritable>() {
public int compare(IntWritable a, IntWritable b) {
return b.get() - a.get();
}
});
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
for(int i=0; i<10; ++i)
context.write(NullWritable.get(), list.get(i));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Top 10 Sort");
job.setJarByClass(WordCount.class);
job.setMapperClass(TopTenMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(TopTenReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment