-
-
Save asw456/a64e7e6b4fb837679be1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
My blog has an introduction to reduce side join in Java map reduce- | |
http://hadooped.blogspot.com/2013/09/reduce-side-join-options-in-java-map.html | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************** | |
Employee Dataset | |
*************************** | |
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo] | |
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005 | |
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007 | |
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004 | |
....... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************** | |
Salary Dataset - curent | |
[Text format] | |
*************************** | |
[EmpNo,Salary,FromDate,ToDate] | |
10001,88958,2002-06-22,9999-01-01 | |
10002,72527,2001-08-02,9999-01-01 | |
10003,43311,2001-12-01,9999-01-01 | |
....... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************** | |
Salary Dataset - Historical | |
[Text format] | |
*************************** | |
[EmpNo,Salary,FromDate,ToDate] | |
10001,88958,2002-06-22,9999-01-01 | |
10001,85097,2001-06-22,2002-06-22 | |
10001,85112,2000-06-22,2001-06-22 | |
10001,84917,1999-06-23,2000-06-22 | |
10001,81097,1998-06-23,1999-06-23 | |
10001,81025,1997-06-23,1998-06-23 | |
10001,80013,1996-06-23,1997-06-23 | |
10001,76884,1995-06-24,1996-06-23 | |
10001,75994,1994-06-24,1995-06-24 | |
10001,75286,1993-06-24,1994-06-24 | |
10001,74333,1992-06-24,1993-06-24 | |
10001,71046,1991-06-25,1992-06-24 | |
10001,66961,1990-06-25,1991-06-25 | |
10001,66596,1989-06-25,1990-06-25 | |
10001,66074,1988-06-25,1989-06-25 | |
10001,62102,1987-06-26,1988-06-25 | |
10001,60117,1986-06-26,1987-06-26 | |
10002,72527,2001-08-02,9999-01-01 | |
10002,71963,2000-08-02,2001-08-02 | |
10002,69366,1999-08-03,2000-08-02 | |
10002,67534,1998-08-03,1999-08-03 | |
10002,65909,1997-08-03,1998-08-03 | |
10002,65828,1996-08-03,1997-08-03 | |
10003,43311,2001-12-01,9999-01-01 | |
10003,43699,2000-12-01,2001-12-01 | |
10003,43478,1999-12-02,2000-12-01 | |
10003,43636,1998-12-02,1999-12-02 | |
10003,43466,1997-12-02,1998-12-02 | |
10003,43616,1996-12-02,1997-12-02 | |
10003,40006,1995-12-03,1996-12-02 | |
....... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************** | |
Department Dataset | |
[Map file] | |
******************************** | |
+---------+--------------------+ | |
| dept_no | dept_name | | |
+---------+--------------------+ | |
| d009 | Customer Service | | |
| d005 | Development | | |
| d002 | Finance | | |
| d003 | Human Resources | | |
| d001 | Marketing | | |
| d004 | Production | | |
| d006 | Quality Management | | |
| d008 | Research | | |
| d007 | Sales | | |
+---------+--------------------+ | |
....... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//******************************************************************************** | |
//Class: CompositeKeyWritableRSJ | |
//Purpose: Custom Writable that serves as composite key | |
// with attributes joinKey and sourceIndex | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableUtils; | |
public class CompositeKeyWritableRSJ implements Writable, | |
WritableComparable<CompositeKeyWritableRSJ> { | |
// Data members | |
private String joinKey;// EmployeeID | |
private int sourceIndex;// 1=Employee data; 2=Salary (current) data; 3=Salary historical data | |
public CompositeKeyWritableRSJ() { | |
} | |
public CompositeKeyWritableRSJ(String joinKey, int sourceIndex) { | |
this.joinKey = joinKey; | |
this.sourceIndex = sourceIndex; | |
} | |
@Override | |
public String toString() { | |
return (new StringBuilder().append(joinKey).append("\t") | |
.append(sourceIndex)).toString(); | |
} | |
public void readFields(DataInput dataInput) throws IOException { | |
joinKey = WritableUtils.readString(dataInput); | |
sourceIndex = WritableUtils.readVInt(dataInput); | |
} | |
public void write(DataOutput dataOutput) throws IOException { | |
WritableUtils.writeString(dataOutput, joinKey); | |
WritableUtils.writeVInt(dataOutput, sourceIndex); | |
} | |
public int compareTo(CompositeKeyWritableRSJ objKeyPair) { | |
int result = joinKey.compareTo(objKeyPair.joinKey); | |
if (0 == result) { | |
result = Double.compare(sourceIndex, objKeyPair.sourceIndex); | |
} | |
return result; | |
} | |
public String getjoinKey() { | |
return joinKey; | |
} | |
public void setjoinKey(String joinKey) { | |
this.joinKey = joinKey; | |
} | |
public int getsourceIndex() { | |
return sourceIndex; | |
} | |
public void setsourceIndex(int sourceIndex) { | |
this.sourceIndex = sourceIndex; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//******************************************************************************** | |
//Class: MapperRSJ | |
//Purpose: Mapper | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
public class MapperRSJ extends | |
Mapper<LongWritable, Text, CompositeKeyWritableRSJ, Text> { | |
CompositeKeyWritableRSJ ckwKey = new CompositeKeyWritableRSJ(); | |
Text txtValue = new Text(""); | |
int intSrcIndex = 0; | |
StringBuilder strMapValueBuilder = new StringBuilder(""); | |
List<Integer> lstRequiredAttribList = new ArrayList<Integer>(); | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
// {{ | |
// Get the source index; (employee = 1, salary = 2) | |
// Added as configuration in driver | |
FileSplit fsFileSplit = (FileSplit) context.getInputSplit(); | |
intSrcIndex = Integer.parseInt(context.getConfiguration().get( | |
fsFileSplit.getPath().getName())); | |
// }} | |
// {{ | |
// Initialize the list of fields to emit as output based on | |
// intSrcIndex (1=employee, 2=current salary, 3=historical salary) | |
if (intSrcIndex == 1) // employee | |
{ | |
lstRequiredAttribList.add(2); // FName | |
lstRequiredAttribList.add(3); // LName | |
lstRequiredAttribList.add(4); // Gender | |
lstRequiredAttribList.add(6); // DeptNo | |
} else // salary | |
{ | |
lstRequiredAttribList.add(1); // Salary | |
lstRequiredAttribList.add(3); // Effective-to-date (Value of | |
// 9999-01-01 indicates current | |
// salary) | |
} | |
// }} | |
} | |
private String buildMapValue(String arrEntityAttributesList[]) { | |
// This method returns csv list of values to emit based on data entity | |
strMapValueBuilder.setLength(0);// Initialize | |
// Build list of attributes to output based on source - employee/salary | |
for (int i = 1; i < arrEntityAttributesList.length; i++) { | |
// If the field is in the list of required output | |
// append to stringbuilder | |
if (lstRequiredAttribList.contains(i)) { | |
strMapValueBuilder.append(arrEntityAttributesList[i]).append( | |
","); | |
} | |
} | |
if (strMapValueBuilder.length() > 0) { | |
// Drop last comma | |
strMapValueBuilder.setLength(strMapValueBuilder.length() - 1); | |
} | |
return strMapValueBuilder.toString(); | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
if (value.toString().length() > 0) { | |
String arrEntityAttributes[] = value.toString().split(","); | |
ckwKey.setjoinKey(arrEntityAttributes[0].toString()); | |
ckwKey.setsourceIndex(intSrcIndex); | |
txtValue.set(buildMapValue(arrEntityAttributes)); | |
context.write(ckwKey, txtValue); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//******************************************************************************** | |
//Class: PartitionerRSJ | |
//Purpose: Custom partitioner | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
public class PartitionerRSJ extends Partitioner<CompositeKeyWritableRSJ, Text> { | |
@Override | |
public int getPartition(CompositeKeyWritableRSJ key, Text value, | |
int numReduceTasks) { | |
// Partitions on joinKey (EmployeeID) | |
return (key.getjoinKey().hashCode() % numReduceTasks); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
//******************************************************************************** | |
//Class: SortingComparatorRSJ | |
//Purpose: Sorting comparator | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class SortingComparatorRSJ extends WritableComparator { | |
protected SortingComparatorRSJ() { | |
super(CompositeKeyWritableRSJ.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
// Sort on all attributes of composite key | |
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1; | |
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2; | |
int cmpResult = key1.getjoinKey().compareTo(key2.getjoinKey()); | |
if (cmpResult == 0)// same joinKey | |
{ | |
return Double.compare(key1.getsourceIndex(), key2.getsourceIndex()); | |
} | |
return cmpResult; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
//******************************************************************************** | |
//Class: GroupingComparatorRSJ | |
//Purpose: For use as grouping comparator | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class GroupingComparatorRSJ extends WritableComparator { | |
protected GroupingComparatorRSJ() { | |
super(CompositeKeyWritableRSJ.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
// The grouping comparator is the joinKey (Employee ID) | |
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1; | |
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2; | |
return key1.getjoinKey().compareTo(key2.getjoinKey()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.URI; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.MapFile; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Reducer; | |
//******************************************************************************** | |
//Class: ReducerRSJ | |
//Purpose: Reducer | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class ReducerRSJ extends | |
Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text> { | |
StringBuilder reduceValueBuilder = new StringBuilder(""); | |
NullWritable nullWritableKey = NullWritable.get(); | |
Text reduceOutputValue = new Text(""); | |
String strSeparator = ","; | |
private MapFile.Reader deptMapReader = null; | |
Text txtMapFileLookupKey = new Text(""); | |
Text txtMapFileLookupValue = new Text(""); | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
// {{ | |
// Get side data from the distributed cache | |
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context | |
.getConfiguration()); | |
for (Path eachPath : cacheFilesLocal) { | |
if (eachPath.getName().toString().trim() | |
.equals("departments_map.tar.gz")) { | |
URI uriUncompressedFile = new File(eachPath.toString() | |
+ "/departments_map").toURI(); | |
initializeDepartmentsMap(uriUncompressedFile, context); | |
} | |
} | |
// }} | |
} | |
@SuppressWarnings("deprecation") | |
private void initializeDepartmentsMap(URI uriUncompressedFile, Context context) | |
throws IOException { | |
// {{ | |
// Initialize the reader of the map file (side data) | |
FileSystem dfs = FileSystem.get(context.getConfiguration()); | |
try { | |
deptMapReader = new MapFile.Reader(dfs, | |
uriUncompressedFile.toString(), context.getConfiguration()); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
// }} | |
} | |
private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key, | |
StringBuilder reduceValueBuilder, Text value) { | |
if (key.getsourceIndex() == 1) { | |
// Employee data | |
// {{ | |
// Get the department name from the MapFile in distributedCache | |
// Insert the joinKey (empNo) to beginning of the stringBuilder | |
reduceValueBuilder.append(key.getjoinKey()).append(strSeparator); | |
String arrEmpAttributes[] = value.toString().split(","); | |
txtMapFileLookupKey.set(arrEmpAttributes[3].toString()); | |
try { | |
deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue); | |
} catch (Exception e) { | |
txtMapFileLookupValue.set(""); | |
} finally { | |
txtMapFileLookupValue | |
.set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue | |
.equals("")) ? "NOT-FOUND" | |
: txtMapFileLookupValue.toString()); | |
} | |
// }} | |
// {{ | |
// Append the department name to the map values to form a complete | |
// CSV of employee attributes | |
reduceValueBuilder.append(value.toString()).append(strSeparator) | |
.append(txtMapFileLookupValue.toString()) | |
.append(strSeparator); | |
// }} | |
} else if (key.getsourceIndex() == 2) { | |
// Current recent salary data (1..1 on join key) | |
// Salary data; Just append the salary, drop the effective-to-date | |
String arrSalAttributes[] = value.toString().split(","); | |
reduceValueBuilder.append(arrSalAttributes[0].toString()).append( | |
strSeparator); | |
} else // key.getsourceIndex() == 3; Historical salary data | |
{ | |
// {{ | |
// Get the salary data but extract only current salary | |
// (to_date='9999-01-01') | |
String arrSalAttributes[] = value.toString().split(","); | |
if (arrSalAttributes[1].toString().equals("9999-01-01")) { | |
// Salary data; Just append | |
reduceValueBuilder.append(arrSalAttributes[0].toString()) | |
.append(strSeparator); | |
} | |
// }} | |
} | |
// {{ | |
// Reset | |
txtMapFileLookupKey.set(""); | |
txtMapFileLookupValue.set(""); | |
// }} | |
return reduceValueBuilder; | |
} | |
@Override | |
public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values, | |
Context context) throws IOException, InterruptedException { | |
// Iterate through values; First set is csv of employee data | |
// second set is salary data; The data is already ordered | |
// by virtue of secondary sort; Append each value; | |
for (Text value : values) { | |
buildOutputValue(key, reduceValueBuilder, value); | |
} | |
// Drop last comma, set value, and emit output | |
if (reduceValueBuilder.length() > 1) { | |
reduceValueBuilder.setLength(reduceValueBuilder.length() - 1); | |
// Emit output | |
reduceOutputValue.set(reduceValueBuilder.toString()); | |
context.write(nullWritableKey, reduceOutputValue); | |
} else { | |
System.out.println("Key=" + key.getjoinKey() + "src=" | |
+ key.getsourceIndex()); | |
} | |
// Reset variables | |
reduceValueBuilder.setLength(0); | |
reduceOutputValue.set(""); | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, | |
InterruptedException { | |
deptMapReader.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.net.URI; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
//******************************************************************************** | |
//Class: DriverRSJ | |
//Purpose: Driver for Reduce Side Join of two datasets | |
// with a 1..1 or 1..many cardinality on join key | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class DriverRSJ extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
// {{ | |
// Exit job if required arguments have not been provided | |
if (args.length != 3) { | |
System.out | |
.printf("Three parameters are required for DriverRSJ- <input dir1> <input dir2> <output dir>\n"); | |
return -1; | |
} | |
// }{ | |
// {{ | |
// Job instantiation | |
Job job = new Job(getConf()); | |
Configuration conf = job.getConfiguration(); | |
job.setJarByClass(DriverRSJ.class); | |
job.setJobName("ReduceSideJoin"); | |
// }} | |
// {{ | |
// Add side data to distributed cache | |
DistributedCache | |
.addCacheArchive( | |
new URI( | |
"/user/akhanolk/joinProject/data/departments_map.tar.gz"), | |
conf); | |
// }} | |
// { | |
// Set sourceIndex for input files; | |
// sourceIndex is an attribute of the compositeKey, | |
// to drive order, and reference source | |
// Can be done dynamically; Hard-coded file names for simplicity | |
conf.setInt("part-e", 1);// Set Employee file to 1 | |
conf.setInt("part-sc", 2);// Set Current salary file to 2 | |
conf.setInt("part-sh", 3);// Set Historical salary file to 3 | |
// } | |
// { | |
// Build csv list of input files | |
StringBuilder inputPaths = new StringBuilder(); | |
inputPaths.append(args[0].toString()).append(",") | |
.append(args[1].toString()); | |
// } | |
// {{ | |
// Configure remaining aspects of the job | |
FileInputFormat.setInputPaths(job, inputPaths.toString()); | |
FileOutputFormat.setOutputPath(job, new Path(args[2])); | |
job.setMapperClass(MapperRSJ.class); | |
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setPartitionerClass(PartitionerRSJ.class); | |
job.setSortComparatorClass(SortingComparatorRSJ.class); | |
job.setGroupingComparatorClass(GroupingComparatorRSJ.class); | |
job.setNumReduceTasks(4); | |
job.setReducerClass(ReducerRSJ.class); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(Text.class); | |
// }} | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), new DriverRSJ(), | |
args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/************************************* | |
Joining datasets in Pig | |
Employee..Salary = 1..many | |
Displaying most recent salary | |
Without using any join optimizations | |
**************************************/ | |
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray); | |
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo; | |
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray); | |
filteredSalDS = filter rawSalDS by toDate == '9999-01-01'; | |
salDS = foreach filteredSalDS generate empNo, salary; | |
joinedDS = join empDS by empNo, salDS by empNo; | |
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary; | |
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ'; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray); | |
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo; | |
sortedEmpDS = ORDER empDS by empNo; | |
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray); | |
filteredSalDS = filter rawSalDS by toDate == '9999-01-01'; | |
salDS = foreach filteredSalDS generate empNo, salary; | |
sortedSalDS = ORDER salDS by empNo; | |
joinedDS = join sortedEmpDS by empNo, sortedSalDS by empNo using 'merge'; | |
finalDS = foreach joinedDS generate sortedEmpDS::empNo,sortedEmpDS::fName,sortedEmpDS::lName,sortedEmpDS::gender,sortedEmpDS::deptNo,sortedSalDS::salary; | |
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
********************** | |
Output of pig script | |
********************** | |
$ hadoop fs -cat joinProject/output/pig-RSJ/part* | less | |
10001 Facello Georgi M d005 88958 | |
10002 Simmel Bezalel F d007 72527 | |
10003 Bamford Parto M d004 43311 | |
10004 Koblick Chirstian M d004 74057 | |
......... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment