Forked from airawat/00-MapSideJoinDistCacheMapFile
Last active
August 29, 2015 14:09
-
-
Save risarora/43bee4dcb2a76263eea6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This gist demonstrates how to do a map-side join, joining a MapFile from distributedcache | |
with a larger dataset in HDFS. | |
Includes: | |
--------- | |
1. Input data and script download | |
2. Dataset structure review | |
3. Expected results | |
4. Mapper code | |
5. Driver code | |
6. Data load commands | |
7. Command to run Java program | |
8. Results of the program |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
01. Data and script download | |
----------------------------- | |
Google: | |
<<To be added>> | |
Email me at [email protected] if you encounter any issues | |
gitHub: | |
<<To be added>> | |
Directory structure | |
------------------- | |
joinProject | |
data | |
employees_tsv | |
employees_tsv | |
departments_map.tar.gz | |
MapSideJoin-DistCacheMapFile | |
src | |
MapperMapSideJoinDCacheMapFile.java | |
DriverMapSideJoinDCacheMapFile | |
jar | |
MapSideJoinDCacheMapFile.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Data structure | |
******************************************** | |
a) Small dataset (departments_map) | |
[DeptNo DeptName] - MapFile | |
d001 Marketing | |
d002 Finance | |
d003 Human Resources | |
d004 Production | |
d005 Development | |
d006 Quality Management | |
d007 Sales | |
d008 Research | |
d009 Customer Service | |
b) Large dataset (employees_tsv) | |
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated | |
10001 1953-09-02 Georgi Facello M 1986-06-26 d005 | |
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007 | |
10003 1959-12-03 Parto Bamford M 1986-08-28 d004 | |
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004 | |
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 | |
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005 | |
10009 1952-04-19 Sumant Peac F 1985-02-18 d006 | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Expected Results | |
******************************************** | |
Everything in employees_tsv file followed by a tab and the department name(from the department file) | |
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development | |
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales | |
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production | |
...... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Mapper | |
*MapperMapSideJoinDCacheMapFile | |
********************************************/ | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.URI; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.MapFile; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperMapSideJoinDCacheMapFile extends | |
Mapper<LongWritable, Text, Text, Text> { | |
private MapFile.Reader deptMapReader = null; | |
private Text txtMapOutputKey = new Text(""); | |
private Text txtMapOutputValue = new Text(""); | |
private Text txtMapLookupKey = new Text(""); | |
private Text txtMapLookupValue = new Text(""); | |
enum MYCOUNTER { | |
RECORD_COUNT, FILE_EXISTS, LOAD_MAP_ERROR | |
} | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context | |
.getConfiguration()); | |
for (Path eachPath : cacheFilesLocal) { | |
if (eachPath.getName().toString().trim() | |
.equals("departments_map.tar.gz")) { | |
URI uriUncompressedFile = new File(eachPath.toString() | |
+ "/departments_map").toURI(); | |
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1); | |
loadDepartmentsMap(uriUncompressedFile, context); | |
} | |
} | |
} | |
@SuppressWarnings("deprecation") | |
private void loadDepartmentsMap(URI uriUncompressedFile, Context context) | |
throws IOException { | |
FileSystem dfs = FileSystem.get(context.getConfiguration()); | |
try { | |
deptMapReader = new MapFile.Reader(dfs, | |
uriUncompressedFile.toString(), context.getConfiguration()); | |
} catch (Exception e) { | |
// TODO Auto-generated catch block | |
context.getCounter(MYCOUNTER.LOAD_MAP_ERROR).increment(1); | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1); | |
if (value.toString().length() > 0) { | |
String arrEmpAttributes[] = value.toString().split("\\t"); | |
txtMapLookupKey.set(arrEmpAttributes[6].toString()); | |
try { | |
deptMapReader.get(txtMapLookupKey, txtMapLookupValue); | |
} finally { | |
txtMapLookupValue | |
.set((txtMapLookupValue.equals(null) || txtMapLookupValue | |
.equals("")) ? "NOT-FOUND" : txtMapLookupValue | |
.toString()); | |
} | |
txtMapOutputKey.set(arrEmpAttributes[0].toString()); | |
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" | |
+ arrEmpAttributes[3].toString() + "\t" | |
+ arrEmpAttributes[4].toString() + "\t" | |
+ arrEmpAttributes[5].toString() + "\t" | |
+ arrEmpAttributes[6].toString() + "\t" | |
+ txtMapLookupValue.toString()); | |
} | |
context.write(txtMapOutputKey, txtMapOutputValue); | |
txtMapLookupValue.set(""); | |
txtMapLookupKey.set(""); | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, | |
InterruptedException { | |
deptMapReader.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Driver | |
*DriverMapSideJoinDCacheMapFile | |
********************************************/ | |
import java.net.URI; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class DriverMapSideJoinDCacheMapFile extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required for DriverMapSideJoinDCacheMapFile- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
Configuration conf = job.getConfiguration(); | |
job.setJobName("Map-side join with mapfile in DCache"); | |
DistributedCache | |
.addCacheArchive( | |
new URI( | |
"/user/akhanolk/joinProject/data/departments_map.tar.gz"), | |
conf); | |
job.setJarByClass(DriverMapSideJoinDCacheMapFile.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(MapperMapSideJoinDCacheMapFile.class); | |
job.setNumReduceTasks(0); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new DriverMapSideJoinDCacheMapFile(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*HDFS load commands | |
******************************************** | |
hadoop fs -mkdir joinProject | |
hadoop fs -mkdir joinProject/data | |
hadoop fs -put joinProject/data/* joinProject/data/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Job run commands | |
******************************************** | |
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheMapFile/jar/MapSideJoinDCacheMapFile.jar DriverMapSideJoinDCacheMapFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Program Output | |
******************************************** | |
hadoop fs -cat /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache/part* | less | |
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development | |
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales | |
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production | |
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production | |
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources | |
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development | |
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management | |
.. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment