Skip to content

Instantly share code, notes, and snippets.

@USCSU
Created March 9, 2014 22:42
Show Gist options
  • Select an option

  • Save USCSU/9456066 to your computer and use it in GitHub Desktop.

Select an option

Save USCSU/9456066 to your computer and use it in GitHub Desktop.
MapReduce Topk
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
public class TopK extends Configured implements Tool {
private static int N = 10 ;
public static class MapClass
extends Mapper<LongWritable, Text, Text, Text> {
private TreeMap<Integer, Text> TopKMap = new TreeMap<Integer, Text>() ;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citations = value.toString().split("\\s+") ;
int value_cnt = citations[1].split(",").length ;
TopKMap.put(new Integer(value_cnt), new Text(citations[0]));
if (TopKMap.size() > N) {
TopKMap.remove(TopKMap.firstKey());
}
} //~map
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Integer k : TopKMap.keySet()) {
context.write(new Text(k.toString()), TopKMap.get(k));
}
} //~cleanup
} //~MapClass
public static class ReduceClass extends Reducer&ltText, Text, Text, Text&gt {
private static final TreeMap<Text, Text> TopKMap = new TreeMap <Text, Text>();
@Override
public void reduce (Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
TopKMap.put(new Text(key), new Text(value));
if (TopKMap.size() > N) {
TopKMap.remove(TopKMap.firstKey()) ;
}
}
} //~reduce
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Text k : TopKMap.keySet()) {
context.write(k, TopKMap.get(k));
}
} //~cleanup
} // ReduceClass
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "TopK");
job.setJarByClass(TopK.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
} //~run
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TopK(), args);
System.exit(res);
} //~main
} //~TopK
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment