Skip to content

Instantly share code, notes, and snippets.

@zygm0nt
Created July 3, 2012 12:00
Show Gist options
  • Save zygm0nt/3039334 to your computer and use it in GitHub Desktop.
Save zygm0nt/3039334 to your computer and use it in GitHub Desktop.
MapReduce with HBaseWD
public class MRJob extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(HBaseConfiguration.create(), new MRJob(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
return runApp(args, help, options, cline);
}
private int runApp(String[] args, HelpFormatter help, Options options, CommandLine cline) {
try {
Configuration conf = getConf();
...
if (!getRunningJob(inputFile, reverseKey, outputTableName, putClass).waitForCompletion(true))
return -1;
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
help.printHelp(CMDLINE, options);
return -1;
}
return 0;
}
private Job getRunningJob(Path inputPath, boolean reverseKey, String outputTableName, String putClass) throws IOException {
Configuration config = getConf();
config.set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
Job job = new Job(config, getClass().getSimpleName());
job.setJarByClass(MRJob.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(SimpleMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
FileInputFormat.addInputPath(job, inputPath);
TableMapReduceUtil.initTableReducerJob(outputTableName, null, job);
job.setNumReduceTasks(0);
return job;
}
public static class SimpleMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
RowKeyDistributorByOneBytePrefix keyDistributor;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
byte bucketsCount = (byte) 32; // distributing into 32 buckets
keyDistributor = new RowKeyDistributorByOneBytePrefix(bucketsCount);
}
@Override
public void map(LongWritable row, Text value, Context context) throws IOException, InterruptedException {
context.write(new ImmutableBytesWritable(Bytes.toBytes(OUTPUT_TABLE_NAME)), putForClass(key, ts, elems));
}
private Put putForClass(String key, long timestamp, String[] elems) throws Exception {
String recalculatedKey = keyDistributor.getDistributedKey(key + (Long.MAX_VALUE - timestamp))
return new Put(recalculatedKey);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment