Created
March 12, 2021 12:57
-
-
Save pavinduLakshan/30593220e7bc8a7980b88497a493b9c2 to your computer and use it in GitHub Desktop.
Hadoop Mapreduce Program for Video Games Sales Problem
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package wordcount; | |
import java.io.File; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.Map.Entry; | |
import java.util.Set; | |
import java.util.StringTokenizer; | |
import java.util.TreeMap; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.FloatWritable; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.Mapper.Context; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
public class VideoSales { | |
/* | |
* PROBLEM: | |
* The data set indicates the sales of video games for EU, NA (North America), JP (Japan), other, and global sales. | |
* The rank, name, platform, year, genre, and publisher are indicated. | |
* Using a MapReduce program find the bottom 3 games based on global sales for each genre. | |
* For each Genre, the bottom 3 global sales should be identified. | |
* In the output, mention the name of the game, genre, and sale. | |
* The output can be in any order. | |
* */ | |
public static class VideoMapper extends Mapper<Object, Text, Text, Text>{ | |
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { | |
String[] features = value.toString().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1); | |
String genre = features[3]; | |
context.write(new Text(genre),new Text(value)); | |
} | |
} | |
public static class VideoReducer extends Reducer<Text,Text,Text,FloatWritable>{ | |
private static TreeMap<FloatWritable,Text> btmGlobalSales = new TreeMap<FloatWritable,Text>(); | |
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { | |
for(Text record: values) { | |
String[] features = record.toString().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1); | |
String name = features[0]; | |
float globalSales = Float.parseFloat(features[9]); | |
btmGlobalSales.put(new FloatWritable(globalSales), new Text(key+" "+name)); | |
if(btmGlobalSales.size() > 3) { | |
btmGlobalSales.remove(btmGlobalSales.lastKey()); | |
} | |
} | |
for(Entry<FloatWritable, Text> entry : btmGlobalSales.entrySet()) { | |
FloatWritable rkey = entry.getKey(); | |
Text value = entry.getValue(); | |
context.write(value,rkey); | |
} | |
btmGlobalSales.clear(); | |
} | |
} | |
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { | |
String[] arguments = {"C:\\hadoop\\datafile\\input\\videosales.txt","C:\\hadoop\\datafile\\output\\videosales"}; | |
FileUtils.deleteDirectory(new File(arguments[1])); | |
Configuration conf = new Configuration(); | |
Job job = Job.getInstance(conf, "VideoSales"); | |
job.setJarByClass(VideoSales.class); | |
job.setMapperClass(VideoMapper.class); | |
job.setReducerClass(VideoReducer.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(FloatWritable.class); | |
FileInputFormat.addInputPath(job, new Path(arguments[0])); | |
FileOutputFormat.setOutputPath(job, new Path(arguments[1])); | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment