Created
March 9, 2014 22:42
-
-
Save USCSU/9456066 to your computer and use it in GitHub Desktop.
MapReduce Topk
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.IOException; | |
| import java.util.*; | |
| import org.apache.hadoop.conf.*; | |
| import org.apache.hadoop.io.*; | |
| import org.apache.hadoop.mapreduce.Job; | |
| import org.apache.hadoop.mapreduce.Mapper; | |
| import org.apache.hadoop.mapreduce.Reducer; | |
| import org.apache.hadoop.mapreduce.lib.input.*; | |
| import org.apache.hadoop.mapreduce.lib.output.*; | |
| import org.apache.hadoop.util.*; | |
| import org.apache.hadoop.fs.Path; | |
| public class TopK extends Configured implements Tool { | |
| private static int N = 10 ; | |
| public static class MapClass | |
| extends Mapper<LongWritable, Text, Text, Text> { | |
| private TreeMap<Integer, Text> TopKMap = new TreeMap<Integer, Text>() ; | |
| public void map(LongWritable key, Text value, Context context) | |
| throws IOException, InterruptedException { | |
| String[] citations = value.toString().split("\\s+") ; | |
| int value_cnt = citations[1].split(",").length ; | |
| TopKMap.put(new Integer(value_cnt), new Text(citations[0])); | |
| if (TopKMap.size() > N) { | |
| TopKMap.remove(TopKMap.firstKey()); | |
| } | |
| } //~map | |
| protected void cleanup(Context context) throws IOException, InterruptedException { | |
| for (Integer k : TopKMap.keySet()) { | |
| context.write(new Text(k.toString()), TopKMap.get(k)); | |
| } | |
| } //~cleanup | |
| } //~MapClass | |
| public static class ReduceClass extends Reducer<Text, Text, Text, Text> { | |
| private static final TreeMap<Text, Text> TopKMap = new TreeMap <Text, Text>(); | |
| @Override | |
| public void reduce (Text key, Iterable<Text> values, Context context) | |
| throws IOException, InterruptedException { | |
| for (Text value : values) { | |
| TopKMap.put(new Text(key), new Text(value)); | |
| if (TopKMap.size() > N) { | |
| TopKMap.remove(TopKMap.firstKey()) ; | |
| } | |
| } | |
| } //~reduce | |
| @Override | |
| protected void cleanup(Context context) throws IOException, InterruptedException { | |
| for (Text k : TopKMap.keySet()) { | |
| context.write(k, TopKMap.get(k)); | |
| } | |
| } //~cleanup | |
| } // ReduceClass | |
| public int run(String[] args) throws Exception { | |
| Configuration conf = getConf(); | |
| Job job = new Job(conf, "TopK"); | |
| job.setJarByClass(TopK.class); | |
| Path in = new Path(args[0]); | |
| Path out = new Path(args[1]); | |
| FileInputFormat.setInputPaths(job, in); | |
| FileOutputFormat.setOutputPath(job, out); | |
| job.setMapperClass(MapClass.class); | |
| job.setReducerClass(ReduceClass.class); | |
| job.setNumReduceTasks(1); | |
| job.setInputFormatClass(TextInputFormat.class); | |
| job.setOutputFormatClass(TextOutputFormat.class); | |
| job.setOutputKeyClass(Text.class); | |
| job.setOutputValueClass(Text.class); | |
| System.exit(job.waitForCompletion(true)?0:1); | |
| return 0; | |
| } //~run | |
| public static void main(String[] args) throws Exception { | |
| int res = ToolRunner.run(new Configuration(), new TopK(), args); | |
| System.exit(res); | |
| } //~main | |
| } //~TopK |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment