Created
August 27, 2012 15:01
-
-
Save geofferyzh/3489292 to your computer and use it in GitHub Desktop.
PYMK Stage3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
// Compares the composite key | |
public class CompositeKeyComparator extends WritableComparator { | |
/*s Constructor. */ | |
protected CompositeKeyComparator() { | |
super(Text.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
Text k1 = (Text)w1; | |
Text k2 = (Text)w2; | |
String[] k1Items = k1.toString().split(":"); | |
String[] k2Items = k2.toString().split(":"); | |
String k1Base = k1Items[0]; | |
String k2Base = k2Items[0]; | |
int comp = k1Base.compareTo(k2Base); | |
if(0 == comp) { | |
comp = -1 * k1Items[1].compareTo(k2Items[1]); | |
} | |
return comp; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.*; | |
import java.lang.*; | |
// Partitions key based on "natural" key | |
public class myPartitioner<K2, V2> implements Partitioner<Text,Text> { | |
@Override | |
public void configure(JobConf job) { | |
} | |
public int getPartition(Text key, Text value, int numPartitions) { | |
String[] keyItems = key.toString().split(":"); | |
String keyBase = keyItems[0]; | |
int hash = keyBase.hashCode() & Integer.MAX_VALUE; | |
int mypartition = hash % numPartitions; | |
return mypartition; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.*; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
// Groups values based on the natural key | |
public class NaturalKeyGroupingComparator extends WritableComparator { | |
//Constructor. | |
protected NaturalKeyGroupingComparator() { | |
super(Text.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
Text k1 = (Text)w1; | |
Text k2 = (Text)w2; | |
String[] k1Items = k1.toString().split(":"); | |
String[] k2Items = k2.toString().split(":"); | |
String k1Base = k1Items[0]; | |
String k2Base = k2Items[0]; | |
int comp = k1Base.compareTo(k2Base); | |
return comp; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.FileInputFormat; | |
import org.apache.hadoop.mapred.FileOutputFormat; | |
import org.apache.hadoop.mapred.JobClient; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class pymk3 extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out.printf( | |
"Usage: %s [generic options] <input dir> <output dir>\n", getClass() | |
.getSimpleName()); | |
ToolRunner.printGenericCommandUsage(System.out); | |
return -1; | |
} | |
JobConf conf = new JobConf(getConf(), pymk3.class); | |
conf.setJobName(this.getClass().getName()); | |
FileInputFormat.setInputPaths(conf, new Path(args[0])); | |
FileOutputFormat.setOutputPath(conf, new Path(args[1])); | |
conf.setMapperClass(pymk3_mapper.class); | |
conf.setReducerClass(pymk3_reducer.class); | |
conf.setMapOutputKeyClass(Text.class); | |
conf.setMapOutputValueClass(Text.class); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(Text.class); | |
conf.setOutputKeyComparatorClass(CompositeKeyComparator.class); | |
conf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class); | |
conf.setPartitionerClass(myPartitioner.class); | |
// conf.setNumReduceTasks(1); | |
JobClient.runJob(conf); | |
return 0; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new pymk3(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.*; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.mapred.MapReduceBase; | |
import org.apache.hadoop.mapred.Mapper; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.Reporter; | |
public class pymk3_mapper extends MapReduceBase implements | |
Mapper<LongWritable, Text, Text, Text> { | |
@Override | |
public void map(LongWritable key, Text value, | |
OutputCollector<Text, Text> output, Reporter reporter) | |
throws IOException { | |
String[] str = value.toString().split("\t"); | |
String[] keystr = str[0].split(":"); | |
String[] pairstr = keystr[0].split(","); | |
output.collect(new Text(pairstr[0] + ":" + keystr[1]), new Text(pairstr[1] + ":" + str[1])); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.Iterator; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.MapReduceBase; | |
import org.apache.hadoop.mapred.Reducer; | |
import org.apache.hadoop.mapred.Reporter; | |
public class pymk3_reducer extends MapReduceBase implements | |
Reducer<Text, Text, Text, Text> { | |
@Override | |
public void reduce(Text key, Iterator<Text> values, | |
OutputCollector<Text, Text> output, Reporter reporter) | |
throws IOException { | |
String[] keystr = key.toString().split(":"); | |
int topcnt = 0; | |
while (values.hasNext() && topcnt <20) { | |
Text value = values.next(); | |
String[] valuestr = value.toString().split(":"); | |
output.collect(new Text(keystr[0] + "," + valuestr[0]), new Text(valuestr[1] + ":" + valuestr[2])); | |
topcnt += 1; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment