-
-
Save ntk148v/d0f1a04796611d843f03356f2a76a604 to your computer and use it in GitHub Desktop.
Map-side join example
- Java code for joining two datasets - one large (tsv format), and one with lookup data (text), made available through DistributedCache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This gist demonstrates how to do a map-side join, loading one small dataset from DistributedCache into a HashMap | |
in memory, and joining with a larger dataset. | |
Includes: | |
--------- | |
1. Input data and script download | |
2. Dataset structure review | |
3. Expected results | |
4. Mapper code | |
5. Driver code | |
6. Data load commands | |
7. Command to run Java program | |
8. Results of the program |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
01. Data and script download | |
----------------------------- | |
Google: | |
<<To be added>> | |
Email me at [email protected] if you encounter any issues | |
gitHub: | |
<<To be added>> | |
Directory structure | |
------------------- | |
joinProject | |
data | |
employees_tsv | |
employees_tsv | |
departments_sorted | |
departments_txt | |
MapSideJoin-DistCacheTxtFile | |
src | |
MapperMapSideJoinDCacheTextFile.java | |
DriverMapSideJoinDCacheTxtFile | |
jar | |
MapSideJoinDCacheTextFile.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Data structure | |
******************************************** | |
a) Small dataset (departments_txt) | |
[DeptNo DeptName] - Tab separated | |
d001 Marketing | |
d002 Finance | |
d003 Human Resources | |
d004 Production | |
d005 Development | |
d006 Quality Management | |
d007 Sales | |
d008 Research | |
d009 Customer Service | |
b) Large dataset (employees_tsv) | |
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated | |
10001 1953-09-02 Georgi Facello M 1986-06-26 d005 | |
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007 | |
10003 1959-12-03 Parto Bamford M 1986-08-28 d004 | |
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004 | |
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 | |
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005 | |
10009 1952-04-19 Sumant Peac F 1985-02-18 d006 | |
... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Expected Results | |
******************************************** | |
Everything in employees_tsv file followed by a tab and the department name(from the department file) | |
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development | |
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales | |
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production | |
...... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Mapper | |
*MapperMapSideJoinDCacheTextFile | |
********************************************/ | |
import java.io.BufferedReader; | |
import java.io.FileNotFoundException; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperMapSideJoinDCacheTextFile extends | |
Mapper<LongWritable, Text, Text, Text> { | |
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>(); | |
private BufferedReader brReader; | |
private String strDeptName = ""; | |
private Text txtMapOutputKey = new Text(""); | |
private Text txtMapOutputValue = new Text(""); | |
enum MYCOUNTER { | |
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR | |
} | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context | |
.getConfiguration()); | |
for (Path eachPath : cacheFilesLocal) { | |
if (eachPath.getName().toString().trim().equals("departments_txt")) { | |
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1); | |
loadDepartmentsHashMap(eachPath, context); | |
} | |
} | |
} | |
private void loadDepartmentsHashMap(Path filePath, Context context) | |
throws IOException { | |
String strLineRead = ""; | |
try { | |
brReader = new BufferedReader(new FileReader(filePath.toString())); | |
// Read each line, split and load to HashMap | |
while ((strLineRead = brReader.readLine()) != null) { | |
String deptFieldArray[] = strLineRead.split("\\t"); | |
DepartmentMap.put(deptFieldArray[0].trim(), | |
deptFieldArray[1].trim()); | |
} | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1); | |
} catch (IOException e) { | |
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1); | |
e.printStackTrace(); | |
}finally { | |
if (brReader != null) { | |
brReader.close(); | |
} | |
} | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1); | |
if (value.toString().length() > 0) { | |
String arrEmpAttributes[] = value.toString().split("\\t"); | |
try { | |
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString()); | |
} finally { | |
strDeptName = ((strDeptName.equals(null) || strDeptName | |
.equals("")) ? "NOT-FOUND" : strDeptName); | |
} | |
txtMapOutputKey.set(arrEmpAttributes[0].toString()); | |
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" | |
+ arrEmpAttributes[3].toString() + "\t" | |
+ arrEmpAttributes[4].toString() + "\t" | |
+ arrEmpAttributes[5].toString() + "\t" | |
+ arrEmpAttributes[6].toString() + "\t" + strDeptName); | |
} | |
context.write(txtMapOutputKey, txtMapOutputValue); | |
strDeptName = ""; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Driver | |
*DriverMapSideJoinDCacheTxtFile | |
********************************************/ | |
import java.net.URI; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class DriverMapSideJoinDCacheTxtFile extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
Configuration conf = job.getConfiguration(); | |
job.setJobName("Map-side join with text lookup file in DCache"); | |
DistributedCache | |
.addCacheFile( | |
new URI( | |
"/user/akhanolk/joinProject/data/departments_sorted/departments_txt"), | |
conf); | |
job.setJarByClass(DriverMapSideJoinDCacheTxtFile.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(MapperMapSideJoinDCacheTextFile.class); | |
job.setNumReduceTasks(0); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new DriverMapSideJoinDCacheTxtFile(), args); | |
System.exit(exitCode); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*HDFS load commands | |
******************************************** | |
hadoop fs -mkdir joinProject | |
hadoop fs -mkdir joinProject/data | |
hadoop fs -put joinProject/data/* joinProject/data/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Job run commands | |
******************************************** | |
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/MapSideJoinDCacheTextFile.jar DriverMapSideJoinDCacheTxtFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTextFileLookUpDistCache | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Program Output | |
******************************************** | |
hadoop fs -cat joinProject/data/output-MapSideTextFileLookUpDistCache/part* | less | |
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development | |
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales | |
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production | |
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production | |
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources | |
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development | |
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management | |
10010 1963-06-01 1963-06-01 Duangkaew Piveteau F 1989-08-24 d006 Quality Management | |
10012 1960-10-04 1960-10-04 Patricio Bridgland M 1992-12-18 d005 Development | |
10013 1963-06-07 1963-06-07 Eberhardt Terkki M 1985-10-20 d003 Human Resources | |
..... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment