Created
June 12, 2015 14:13
-
-
Save obar1/4b1b591c516835bf4c39 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
public class ReduceJoin { | |
public static class CustsMapper extends | |
Mapper<Object, Text, Text, Text> { | |
public void map(Object key, Text value, Context context) | |
throws IOException, InterruptedException { | |
// 4000001,Kristina,Chung,55,Pilot | |
String record = value.toString(); | |
String[] parts = record.split(","); | |
context.write(new Text(parts[0]), new Text("custs\t" + parts[1])); // key is th id, the common fields between the 2 files in input | |
} | |
} | |
public static class TxnsMapper extends | |
Mapper<Object, Text, Text, Text> { | |
public void map(Object key, Text value, Context context) | |
throws IOException, InterruptedException { | |
//00000000,06-26-2011,4000001,040.33,Exercise & Fitness,Cardio Machine Accessories,Clarksville,Tennessee,credit | |
String record = value.toString(); | |
String[] parts = record.split(","); | |
context.write(new Text(parts[2]), new Text("txns\t" + parts[3])); // emit id and what we wnt to join | |
} | |
} | |
public static class ReduceJoinReducer extends | |
Reducer<Text, Text, Text, Text> { | |
public void reduce(Text key, Iterable<Text> values, Context context) | |
throws IOException, InterruptedException { | |
// the reducer works on (key, L(Values)) so in the values we have custs and txns | |
String name = ""; | |
double total = 0.0; | |
int count = 0; | |
for (Text t : values) { // in values we have 1 value for the custmoer info and all the related txns related infor to the customer, because we have cust id as key for both | |
String parts[] = t.toString().split("\t"); | |
if (parts[0].equals("txns")) { // logic to undertand what is the value about | |
count++; | |
total += Float.parseFloat(parts[1]); | |
} else if (parts[0].equals("custs")) { | |
name = parts[1]; | |
} | |
} | |
String str = String.format("%d\t%f", count, total); | |
context.write(new Text(name), new Text(str)); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Configuration conf = new Configuration(); | |
Job job = new Job(conf, "Reduce-side join"); | |
job.setJarByClass(ReduceJoin.class); | |
job.setReducerClass(ReduceJoinReducer.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
// define the multile inputs and the 2 mappers | |
MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CustsMapper.class); | |
MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, TxnsMapper.class); | |
Path outputPath = new Path(args[2]); | |
FileOutputFormat.setOutputPath(job, outputPath); | |
outputPath.getFileSystem(conf).delete(outputPath); | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment