Skip to content

Instantly share code, notes, and snippets.

@chriswhite199
Created March 9, 2013 01:00
Show Gist options
  • Save chriswhite199/5121876 to your computer and use it in GitHub Desktop.
Save chriswhite199/5121876 to your computer and use it in GitHub Desktop.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
/**
* <a href=
* "http://stackoverflow.com/questions/15292061/how-to-implement-a-java-mapreduce-that-produce-output-values-large-then-the-maxi"
* >Original Question</a>
*/
public class SO_15292061 {
/**
* Composite key that contains the App ID and User ID
*/
public static class CompKey implements WritableComparable<CompKey> {
public Text appId = new Text();
public Text userId = new Text();
@Override
public void write(DataOutput out) throws IOException {
appId.write(out);
userId.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
appId.readFields(in);
userId.readFields(in);
}
@Override
public int compareTo(CompKey o) {
int c = appId.compareTo(o.appId);
if (c != 0) {
return c;
}
return userId.compareTo(o.userId);
}
}
/**
* Partition by {@link CompKey#appId} - we need all {@link CompKey}'s with
* the same App ID to be processed by the same reducer
*/
public static class AppIdPartitioner extends Partitioner<CompKey, Writable> {
@Override
public int getPartition(CompKey key, Writable value, int numReduceTasks) {
return (key.appId.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
/**
* Assuming this is paired with an appropriate output format as described in
* <a href=
* "http://stackoverflow.com/questions/10140171/handling-large-output-values-from-reduce-step-in-hadoop"
* >this answer</a> - this will allow you to write out an huge CSV list of
* user IDs for the same App ID
*/
public static class MyReducer extends
Reducer<CompKey, NullWritable, Text, Text> {
@Override
protected void reduce(CompKey key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
context.write(key.appId, key.userId);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment