Created
November 26, 2013 17:10
-
-
Save abshingate/7662128 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.abhijit.mr.sequence.number; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.Writable; | |
public class MRSeqNumCompositeValue implements Writable, | |
Comparable<MRSeqNumCompositeValue> { | |
private int taskId; | |
private long taskOutputRecordCount; | |
private int targetParitionNumber = -1; | |
private Text value; | |
@Override | |
public void write(DataOutput out) throws IOException { | |
out.writeInt(taskId); | |
out.writeLong(taskOutputRecordCount); | |
value.write(out); | |
} | |
@Override | |
public void readFields(DataInput in) throws IOException { | |
this.taskId = in.readInt(); | |
this.taskOutputRecordCount = in.readLong(); | |
value = new Text(); | |
value.readFields(in); | |
} | |
public int getTaskId() { | |
return taskId; | |
} | |
public void setTaskId(int taskId) { | |
this.taskId = taskId; | |
} | |
public long getTaskOutputRecordCount() { | |
return taskOutputRecordCount; | |
} | |
public void setTaskOutputRecordCount(long taskOutputRecordCount) { | |
this.taskOutputRecordCount = taskOutputRecordCount; | |
} | |
public Text getValue() { | |
return value; | |
} | |
public void setValue(Text value) { | |
this.value = value; | |
} | |
public int getTargetParitionNumber() { | |
return targetParitionNumber; | |
} | |
public void setTargetParitionNumber(int targetParitionNumber) { | |
this.targetParitionNumber = targetParitionNumber; | |
} | |
@Override | |
public int compareTo(MRSeqNumCompositeValue other) { | |
return Integer.valueOf(this.taskId).compareTo( | |
Integer.valueOf(other.taskId)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.abhijit.mr.sequence.number; | |
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class MRSeqNumMapper extends | |
Mapper<LongWritable, Text, LongWritable, MRSeqNumCompositeValue> { | |
private static final Logger LOG = LoggerFactory | |
.getLogger(MRSeqNumMapper.class); | |
private long counter = 0; | |
private int reduceTaskCount = -1; | |
private MRSeqNumCompositeValue seqNumCompositeValue = new MRSeqNumCompositeValue(); | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
int thisTaskId = context.getTaskAttemptID().getTaskID().getId(); | |
LOG.info("task id = {}", thisTaskId); | |
this.reduceTaskCount = context.getConfiguration().getInt( | |
"mapred.reduce.tasks", -1); | |
LOG.info("reduce task count = {}", reduceTaskCount); | |
seqNumCompositeValue.setTaskId(thisTaskId); | |
seqNumCompositeValue.setTargetParitionNumber(thisTaskId); | |
} | |
@Override | |
protected void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
counter++; | |
seqNumCompositeValue.setValue(value); | |
context.write(key, seqNumCompositeValue); | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, | |
InterruptedException { | |
seqNumCompositeValue.setValue(new Text("")); | |
LOG.info("map output records count = {}", counter); | |
seqNumCompositeValue.setTaskOutputRecordCount(counter); | |
for (int i = 0; i < reduceTaskCount; i++) { | |
seqNumCompositeValue.setTargetParitionNumber(i); | |
context.write(new LongWritable(-1), seqNumCompositeValue); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
package com.abhijit.mr.sequence.number; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
/** | |
* @author ashing0 | |
* | |
*/ | |
public class MRSeqNumPartitioner extends | |
Partitioner<LongWritable, MRSeqNumCompositeValue> { | |
@Override | |
public int getPartition(LongWritable key, MRSeqNumCompositeValue value, | |
int numPartitions) { | |
return value.getTargetParitionNumber(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.abhijit.mr.sequence.number; | |
import java.io.IOException; | |
import java.util.Iterator; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class MRSeqNumReducer extends | |
Reducer<LongWritable, MRSeqNumCompositeValue, LongWritable, Text> { | |
private static final Logger LOG = LoggerFactory | |
.getLogger(MRSeqNumMapper.class); | |
private long startSeqNumber = -1; | |
private int thisTaskId = -1; | |
private LongWritable longWritable = new LongWritable(); | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
thisTaskId = context.getTaskAttemptID().getTaskID().getId(); | |
LOG.info("task id = {}", thisTaskId); | |
} | |
@Override | |
protected void reduce(LongWritable key, | |
Iterable<MRSeqNumCompositeValue> values, Context context) | |
throws IOException, InterruptedException { | |
LOG.info("got key = {}", key.get()); | |
if (key.get() == -1) { | |
Iterator<MRSeqNumCompositeValue> iterator = values.iterator(); | |
int reduceTaskCount = context.getConfiguration().getInt( | |
"mapred.reduce.tasks", -1); | |
long[] mapCounts = new long[reduceTaskCount]; | |
while (iterator.hasNext()) { | |
MRSeqNumCompositeValue value = (MRSeqNumCompositeValue) iterator.next(); | |
LOG.info("current task id = {} , current map counts = {}", | |
value.getTaskId(), value.getTaskOutputRecordCount()); | |
mapCounts[value.getTaskId()] = value.getTaskOutputRecordCount(); | |
} | |
LOG.info("total map counts = {}", mapCounts.length); | |
startSeqNumber = 0; | |
for (int i = 0; i < mapCounts.length; i++) { | |
if (i >= thisTaskId) { | |
LOG.info("exiting with start seq number = {}", startSeqNumber); | |
break; | |
} | |
startSeqNumber = startSeqNumber + mapCounts[i]; | |
} | |
return; | |
} | |
LOG.info("start sequence number = {}", startSeqNumber); | |
if (startSeqNumber == -1) { | |
throw new RuntimeException("seq number must have been found by now"); | |
} | |
Iterator<MRSeqNumCompositeValue> iterator = values.iterator(); | |
while (iterator.hasNext()) { | |
MRSeqNumCompositeValue value = (MRSeqNumCompositeValue) iterator.next(); | |
longWritable.set(startSeqNumber++); | |
context.write(longWritable, value.getValue()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.abhijit.mr.sequence.number; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class MRSequenceNumberAssigner extends Configured implements Tool { | |
private static final Logger LOG = LoggerFactory | |
.getLogger(MRSequenceNumberAssigner.class); | |
public int run(String[] args) throws Exception { | |
LOG.info("Starting job"); | |
if (args.length < 2) { | |
LOG.error("Incorrect inputs"); | |
printHelp(); | |
return -1; | |
} | |
String inputPathString = args[0]; | |
String outputPathString = args[1]; | |
Job job = new Job(getConf()); | |
job.setJarByClass(MRSeqNumMapper.class); | |
job.setMapperClass(MRSeqNumMapper.class); | |
job.setReducerClass(MRSeqNumReducer.class); | |
job.setOutputKeyClass(LongWritable.class); | |
job.setOutputValueClass(MRSeqNumCompositeValue.class); | |
job.setPartitionerClass(MRSeqNumPartitioner.class); | |
LOG.info("Adding {} in the list of input paths", inputPathString); | |
FileInputFormat.setInputPaths(job, new Path(inputPathString)); | |
FileOutputFormat.setOutputPath(job, new Path(outputPathString)); | |
int numberOfSplits = new TextInputFormat().getSplits(job).size(); | |
LOG.info("Number of splits = {}", numberOfSplits); | |
job.setNumReduceTasks(numberOfSplits); | |
boolean result = job.waitForCompletion(true); | |
if (!result) { | |
return -2; | |
} | |
return 0; | |
} | |
private void printHelp() { | |
LOG.info("Usage: hadoop job mrseqnum.jar <input_path> <output_path>"); | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new MRSequenceNumberAssigner(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.abhijit</groupId> | |
<artifactId>mr-sequence-number</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<packaging>jar</packaging> | |
<name>mr-sequence-number</name> | |
<url>http://maven.apache.org</url> | |
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.hive</groupId> | |
<artifactId>hive-jdbc</artifactId> | |
<version>0.10.0-cdh4.2.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.mockito</groupId> | |
<artifactId>mockito-all</artifactId> | |
<version>1.8.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.pig</groupId> | |
<artifactId>pig</artifactId> | |
<version>0.8.1-cdh3u3</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
<version>1.6.4</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
<version>1.6.4</version> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.11</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<repositories> | |
<repository> | |
<id>cloudera-releases</id> | |
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url> | |
<releases> | |
<enabled>true</enabled> | |
</releases> | |
<snapshots> | |
<enabled>false</enabled> | |
</snapshots> | |
</repository> | |
</repositories> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<configuration> | |
<source>1.6</source> | |
<target>1.6</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment