Last active
September 25, 2016 14:31
-
-
Save airawat/6600892 to your computer and use it in GitHub Desktop.
Map-side join example - Java code for joining two datasets - one large (tsv format), and one with reference data (txt file), made available through DistributedCache via command line (GenericOptionsParser)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This gist is part of a series of gists related to Map-side joins in Java map-reduce. | |
In the gist - https://gist.github.com/airawat/6597557, we added the reference data available | |
in HDFS to the distributed cache from the driver code. | |
This gist demonstrates adding a local file via command line to distributed cache. | |
Refer gist at https://gist.github.com/airawat/6597557 for- | |
1. Data samples and structure | |
2. Expected results | |
3. Commands to load data to HDFS | |
The listing below includes: | |
4. Data and code download location | |
5. Mapper code | |
6. Driver code | |
7. Command to run the program | |
8. Results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
04. Data and script download | |
----------------------------- | |
Google: | |
<<To be added>> | |
Email me at [email protected] if you encounter any issues | |
gitHub: | |
<<To be added>> | |
Directory structure | |
------------------- | |
joinProject | |
data | |
employees_tsv | |
employees_tsv | |
departments_sorted | |
departments_txt | |
MapSideJoin-DistCacheTxtFileGOP | |
src | |
MapperMapSideJoinDCacheTextFileGOP.java | |
DriverMapSideJoinDCacheTxtFileGOP.java | |
jar | |
MapSideJoin-DistCacheTxtFileGOP.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Mapper | |
*MapperMapSideJoinDCacheTextFileGOP | |
********************************************/ | |
import java.io.BufferedReader; | |
import java.io.File; | |
import java.io.FileNotFoundException; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperMapSideJoinDCacheTextFileGOP extends | |
Mapper<LongWritable, Text, Text, Text> { | |
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>(); | |
private BufferedReader brReader; | |
private String strDeptName = ""; | |
private Text txtMapOutputKey = new Text(""); | |
private Text txtMapOutputValue = new Text(""); | |
enum MYCOUNTER { | |
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR | |
} | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
File lookupFile = new File("departments_txt"); | |
String strLineRead = ""; | |
try { | |
brReader = new BufferedReader(new FileReader(lookupFile)); | |
// Read each line, split and load to HashMap | |
while ((strLineRead = brReader.readLine()) != null) { | |
String deptFieldArray[] = strLineRead.split("\\t"); | |
DepartmentMap.put(deptFieldArray[0].trim(), | |
deptFieldArray[1].trim()); | |
} | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1); | |
} catch (IOException e) { | |
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1); | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1); | |
if (value.toString().length() > 0) { | |
String arrEmpAttributes[] = value.toString().split("\\t"); | |
try { | |
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString()); | |
} finally { | |
strDeptName = ((strDeptName.equals(null) || strDeptName | |
.equals("")) ? "NOT-FOUND" : strDeptName); | |
} | |
txtMapOutputKey.set(arrEmpAttributes[0].toString()); | |
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" | |
+ arrEmpAttributes[3].toString() + "\t" | |
+ arrEmpAttributes[4].toString() + "\t" | |
+ arrEmpAttributes[5].toString() + "\t" | |
+ arrEmpAttributes[6].toString() + "\t" + strDeptName); | |
} | |
context.write(txtMapOutputKey, txtMapOutputValue); | |
strDeptName = ""; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Driver | |
*DriverMapSideJoinDCacheTxtFileGOP | |
********************************************/ | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class DriverMapSideJoinDCacheTxtFileGOP extends Configured implements | |
Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required for DriverMapSideJoinDCacheTxtFileGOP- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
job.setJobName("Map-side join with text lookup file in DCache-GenericOptionsParser"); | |
job.setJarByClass(DriverMapSideJoinDCacheTxtFileGOP.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(MapperMapSideJoinDCacheTextFileGOP.class); | |
job.setNumReduceTasks(0); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new DriverMapSideJoinDCacheTxtFileGOP(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Job run commands | |
******************************************** | |
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Program Output | |
******************************************** | |
See - https://gist.github.com/airawat/6597557 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
My notes - please disregard
scp /Users/akhanolkar/Documents/hadoop-jars/MapSideJoin-DistCacheTxtFileGOP.jar akhanolk@cdh-dev01:~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/
hadoop fs -rm -R joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP