-
-
Save risarora/58920798756f7b434c02 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
********************** | |
**Gist | |
********************** | |
This gist details how to inner join two large datasets on the map-side, leveraging the join capability | |
in mapreduce. Such a join makes sense if both input datasets are too large to qualify for distribution | |
through distributedcache, and can be implemented if both input datasets can be joined by the join key | |
and both input datasets are sorted in the same order, by the join key. | |
There are two critical pieces to engaging the join behavior: | |
- the input format must be set to CompositeInputFormat.class, and | |
- the key mapred.join.expr must have a value that is a valid join specification. | |
Sample program: | |
Covers inner join of employee and salary data with employee ID as join key in a map-only program | |
Inner join: | |
The inner join is a traditional database-style inner join. The map method will be called with a key/value | |
set only if every dataset in the join contains the key. The TupleWritable value will contain a value for | |
every dataset in the join, join key excluded. | |
Key code in the sample program: | |
conf.setInputFormat(CompositeInputFormat.class); | |
String strJoinStmt = CompositeInputFormat.compose("inner", | |
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData); | |
conf.set("mapred.join.expr", strJoinStmt); | |
conf.setOutputFormat(TextOutputFormat.class); | |
TextOutputFormat.setOutputPath(conf, dirOutput); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(Text.class); | |
Old API: | |
I ended up using the old API as the new API does not include CompositeInputFormat in the version of | |
Hadoop I am running. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Data and code download | |
******************************* | |
Data and code: | |
-------------- | |
gitHub: | |
<<To be added>> | |
Email me at [email protected] if you encounter any issues | |
Directory structure | |
------------------- | |
joinProject | |
data | |
employees_sorted | |
part-e | |
salaries_sorted | |
part-s | |
MapSideJoinLargeDatasets | |
src | |
KeyValueLongInputFormat.java | |
KeyValueLongLineRecordReader.java | |
MapperMapSideJoinLargeDatasets.java | |
DriverMapSideJoinLargeDatasets.java | |
jar | |
MapSideJoinLgDsOAPI.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************** | |
Data Structure Review | |
******************************** | |
Datasets: | |
The two datasets are employee and salary datasets. | |
Join key: | |
The join key is EmpNo/employee number | |
Location of join key: | |
The join key is the first field in both datasets | |
Sorting: | |
The data is sorted by the join key "EmpNo" in ascending order. | |
Sorting is crucial for accuracy of joins | |
File format: | |
The files are in text format, with comma as a separator | |
Cardinality: | |
Is 1..1 on join key; Both datasets have the same number of records | |
Employee data [joinProject/data/employees_sorted/part-e] | |
-------------------------------------------------------- | |
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo] | |
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005 | |
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007 | |
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004 | |
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004 | |
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003 | |
10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005 | |
..... | |
Salary data [joinProject/data/salaries_sorted/part-s] | |
------------------------------------------------------ | |
[EmpNo,Salary,FromDate,ToDate] | |
10001,88958,2002-06-22,9999-01-01 | |
10002,72527,2001-08-02,9999-01-01 | |
10003,43311,2001-12-01,9999-01-01 | |
10004,74057,2001-11-27,9999-01-01 | |
10005,94692,2001-09-09,9999-01-01 | |
.......... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************************ | |
Expected Results - tab separated | |
************************************ | |
[EmpNo FName LName Salary] | |
10001 Georgi Facello 88958 | |
10002 Bezalel Simmel 72527 | |
10003 Parto Bamford 43311 | |
10004 Chirstian Koblick 74057 | |
10005 Kyoichi Maliniak 94692 | |
10006 Anneke Preusig 59755 | |
10009 Sumant Peac 94409 | |
10010 Duangkaew Piveteau 80324 | |
........ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
****************************** | |
Observations | |
****************************** | |
Setting the inputformat to KeyValueTextInputFormat resulted in only | |
part of the data getting joined. I attributed this to the fact that | |
the EmpNo is numeric and the sort was not working right with the attribute | |
set as Text. Found that others had encountered the same issue..and one | |
individualhad created a custom format - KeyValueLongInputFormat and associated record | |
reader. This gist uses the same code, with minor modifications. | |
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*KeyValueLongLineRecordReader.java | |
*Custom record reader | |
**********************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.mapred.LineRecordReader; | |
import org.apache.hadoop.mapred.RecordReader; | |
public class KeyValueLongLineRecordReader implements | |
RecordReader<LongWritable, Text> { | |
private final LineRecordReader lineRecordReader; | |
private byte separator = (byte) ','; | |
private LongWritable dummyKey; | |
private Text innerValue; | |
public Class getKeyClass() { | |
return LongWritable.class; | |
} | |
public LongWritable createKey() { | |
return new LongWritable(); | |
} | |
public Text createValue() { | |
return new Text(); | |
} | |
public KeyValueLongLineRecordReader(Configuration job, FileSplit split) | |
throws IOException { | |
lineRecordReader = new LineRecordReader(job, split); | |
dummyKey = lineRecordReader.createKey(); | |
innerValue = lineRecordReader.createValue(); | |
String sepStr = job.get("key.value.separator.in.input.line", ","); | |
this.separator = (byte) sepStr.charAt(0); | |
} | |
public static int findSeparator(byte[] utf, int start, int length, byte sep) { | |
for (int i = start; i < (start + length); i++) { | |
if (utf[i] == sep) { | |
return i; | |
} | |
} | |
return -1; | |
} | |
/** Read key/value pair in a line. */ | |
public synchronized boolean next(LongWritable key, Text value) | |
throws IOException { | |
LongWritable tKey = key; | |
Text tValue = value; | |
byte[] line = null; | |
int lineLen = -1; | |
if (lineRecordReader.next(dummyKey, innerValue)) { | |
line = innerValue.getBytes(); | |
lineLen = innerValue.getLength(); | |
} else { | |
return false; | |
} | |
if (line == null) | |
return false; | |
int pos = findSeparator(line, 0, lineLen, this.separator); | |
if (pos == -1) { | |
tKey.set(Long.valueOf(new String(line, 0, lineLen))); | |
tValue.set(""); | |
} else { | |
int keyLen = pos; | |
byte[] keyBytes = new byte[keyLen]; | |
System.arraycopy(line, 0, keyBytes, 0, keyLen); | |
int valLen = lineLen - keyLen - 1; | |
byte[] valBytes = new byte[valLen]; | |
System.arraycopy(line, pos + 1, valBytes, 0, valLen); | |
tKey.set(Long.valueOf(new String(keyBytes))); | |
tValue.set(valBytes); | |
} | |
return true; | |
} | |
public float getProgress() { | |
return lineRecordReader.getProgress(); | |
} | |
public synchronized long getPos() throws IOException { | |
return lineRecordReader.getPos(); | |
} | |
public synchronized void close() throws IOException { | |
lineRecordReader.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*KeyValueLongInputFormat.java | |
*Custom key value format | |
**********************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.compress.CompressionCodecFactory; | |
import org.apache.hadoop.mapred.FileInputFormat; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.mapred.InputSplit; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.JobConfigurable; | |
import org.apache.hadoop.mapred.RecordReader; | |
import org.apache.hadoop.mapred.Reporter; | |
public class KeyValueLongInputFormat extends | |
FileInputFormat<LongWritable, Text> implements JobConfigurable { | |
private CompressionCodecFactory compressionCodecs = null; | |
@Override | |
public void configure(JobConf conf) { | |
compressionCodecs = new CompressionCodecFactory(conf); | |
} | |
protected boolean isSplitable(FileSystem fs, Path file) { | |
return compressionCodecs.getCodec(file) == null; | |
} | |
@Override | |
public RecordReader<LongWritable, Text> getRecordReader( | |
InputSplit genericSplit, JobConf job, Reporter reporter) | |
throws IOException { | |
reporter.setStatus(genericSplit.toString()); | |
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*MapperMapSideJoinLargeDatasets.java | |
*Mapper | |
**********************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.MapReduceBase; | |
import org.apache.hadoop.mapred.Mapper; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.Reporter; | |
import org.apache.hadoop.mapred.join.TupleWritable; | |
public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements | |
Mapper<LongWritable, TupleWritable, Text, Text> { | |
Text txtKey = new Text(""); | |
Text txtValue = new Text(""); | |
@Override | |
public void map(LongWritable key, TupleWritable value, | |
OutputCollector<Text, Text> output, Reporter reporter) | |
throws IOException { | |
if (value.toString().length() > 0) { | |
txtKey.set(key.toString()); | |
String arrEmpAttributes[] = value.get(0).toString().split(","); | |
String arrDeptAttributes[] = value.get(1).toString().split(","); | |
txtValue.set(arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" | |
+ arrDeptAttributes[0].toString()); | |
output.collect(txtKey, txtValue); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*DriverMapSideJoinLargeDatasets | |
*Driver | |
**********************************/ | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.JobClient; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.RunningJob; | |
import org.apache.hadoop.mapred.TextOutputFormat; | |
import org.apache.hadoop.mapred.join.CompositeInputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
public class DriverMapSideJoinLargeDatasets { | |
public static void main(String[] args) throws Exception { | |
JobConf conf = new JobConf("DriverMapSideJoinLargeDatasets"); | |
conf.setJarByClass(DriverMapSideJoinLargeDatasets.class); | |
String[] jobArgs = new GenericOptionsParser(conf, args) | |
.getRemainingArgs(); | |
Path dirEmployeesData = new Path(jobArgs[0]); | |
Path dirSalaryData = new Path(jobArgs[1]); | |
Path dirOutput = new Path(jobArgs[2]); | |
conf.setMapperClass(MapperMapSideJoinLargeDatasets.class); | |
conf.setInputFormat(CompositeInputFormat.class); | |
String strJoinStmt = CompositeInputFormat.compose("inner", | |
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData); | |
conf.set("mapred.join.expr", strJoinStmt); | |
conf.setNumReduceTasks(0); | |
conf.setOutputFormat(TextOutputFormat.class); | |
TextOutputFormat.setOutputPath(conf, dirOutput); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(Text.class); | |
RunningJob job = JobClient.runJob(conf); | |
while (!job.isComplete()) { | |
Thread.sleep(1000); | |
} | |
System.exit(job.isSuccessful() ? 0 : 2); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
HDFS data load commands | |
************************** | |
hadoop fs -mkdir joinProject | |
hadoop fs -put joinProject/* joinProject/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
Command to run program | |
************************** | |
hadoop jar ~/Blog/joinProject/MapSideJoinLargeDatasets/jar/MapSideJoinLgDsOAPI.jar DriverMapSideJoinLargeDatasets /user/akhanolk/joinProject/data/employees_sorted/part-e /user/akhanolk/joinProject/data/salaries_sorted/part-s /user/akhanolk/joinProject/output/output-MapSideJoinLargeDatasets |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
Results | |
************************** | |
... | |
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 | |
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 | |
13/09/22 13:11:17 INFO mapred.JobClient: Map-Reduce Framework | |
13/09/22 13:11:17 INFO mapred.JobClient: Map input records=224683000 | |
13/09/22 13:11:17 INFO mapred.JobClient: Map output records=224683000 | |
... | |
$ hadoop fs -cat joinProject/output/output-MapSideJoinLargeDatasets/part* | less | |
10001 Georgi Facello 88958 | |
10002 Bezalel Simmel 72527 | |
10003 Parto Bamford 43311 | |
10004 Chirstian Koblick 74057 | |
10005 Kyoichi Maliniak 94692 | |
10006 Anneke Preusig 59755 | |
10009 Sumant Peac 94409 | |
10010 Duangkaew Piveteau 80324 | |
..... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
References | |
************************** | |
Concepts: | |
Pro Hadoop | |
Hadoop the Definitive Guide | |
Data-Intensive Text Processing with MapReduce | |
Code: | |
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data | |
Data: | |
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
Pig - map-side join | |
of datasets with cardinality | |
of 1..1 | |
Using 'replicated' | |
or | |
'merge', if sorted | |
********************************* | |
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray); | |
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo; | |
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_active/part-sc' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray); | |
salDS = foreach rawSalDS generate empNo, salary; | |
joinedDS = join empDS by empNo, salDS by empNo using 'replicated'; | |
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary; | |
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ1To1'; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment