Created
July 3, 2012 12:00
-
-
Save zygm0nt/3039334 to your computer and use it in GitHub Desktop.
MapReduce with HBaseWD
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MRJob extends Configured implements Tool { | |
public static void main(String[] args) throws Exception { | |
int res = ToolRunner.run(HBaseConfiguration.create(), new MRJob(), args); | |
System.exit(res); | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
return runApp(args, help, options, cline); | |
} | |
private int runApp(String[] args, HelpFormatter help, Options options, CommandLine cline) { | |
try { | |
Configuration conf = getConf(); | |
... | |
if (!getRunningJob(inputFile, reverseKey, outputTableName, putClass).waitForCompletion(true)) | |
return -1; | |
} catch (Exception e) { | |
System.out.println(e); | |
e.printStackTrace(); | |
help.printHelp(CMDLINE, options); | |
return -1; | |
} | |
return 0; | |
} | |
private Job getRunningJob(Path inputPath, boolean reverseKey, String outputTableName, String putClass) throws IOException { | |
Configuration config = getConf(); | |
config.set(TableOutputFormat.OUTPUT_TABLE, outputTableName); | |
Job job = new Job(config, getClass().getSimpleName()); | |
job.setJarByClass(MRJob.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setMapperClass(SimpleMapper.class); | |
job.setMapOutputKeyClass(ImmutableBytesWritable.class); | |
job.setMapOutputValueClass(Put.class); | |
FileInputFormat.addInputPath(job, inputPath); | |
TableMapReduceUtil.initTableReducerJob(outputTableName, null, job); | |
job.setNumReduceTasks(0); | |
return job; | |
} | |
public static class SimpleMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { | |
RowKeyDistributorByOneBytePrefix keyDistributor; | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
byte bucketsCount = (byte) 32; // distributing into 32 buckets | |
keyDistributor = new RowKeyDistributorByOneBytePrefix(bucketsCount); | |
} | |
@Override | |
public void map(LongWritable row, Text value, Context context) throws IOException, InterruptedException { | |
context.write(new ImmutableBytesWritable(Bytes.toBytes(OUTPUT_TABLE_NAME)), putForClass(key, ts, elems)); | |
} | |
private Put putForClass(String key, long timestamp, String[] elems) throws Exception { | |
String recalculatedKey = keyDistributor.getDistributedKey(key + (Long.MAX_VALUE - timestamp)) | |
return new Put(recalculatedKey); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment