Skip to content

Instantly share code, notes, and snippets.

@alienrobotwizard
Created January 18, 2011 00:44
Show Gist options
  • Save alienrobotwizard/783805 to your computer and use it in GitHub Desktop.
Save alienrobotwizard/783805 to your computer and use it in GitHub Desktop.
public class DataChunkToHFiles extends Configured implements Tool {
public static class TextToKeyValues extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
private byte[] columnFamily;
private byte[] tableName;
private int keyField;
private String[] fieldNames;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
tableName = conf.get("hbase.table.name").getBytes();
columnFamily = conf.get("hbase.column.family").getBytes();
keyField = conf.getInt("hbase.key.field", 0); //default to field 0 as the row key
fieldNames = conf.get("hbase.field.names").split(",");
}
protected void map(LongWritable key, Text line, Context context) throws IOException ,InterruptedException {
String[] fields = line.toString().split("\t");
byte[] rowKey = fields[keyField].getBytes();
// Create output for Hbase reducer
ImmutableBytesWritable hbaseRowKey = new ImmutableBytesWritable(rowKey);
// for(int i = 0; i < fields.length; i++) {
// if (i < fieldNames.length && i != keyField) {
// if (fields[i].length() != 0) {
byte[] columnName = fieldNames[1].getBytes();
byte[] columnValue = fields[1].getBytes();
KeyValue kv = new KeyValue(rowKey, columnFamily, columnName, columnValue);
context.write(hbaseRowKey, kv);
// }
// }
// }
}
}
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(DataChunkToHFiles.class);
job.setJobName(" @('_')@ data data data");
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setMapperClass(TextToKeyValues.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setNumReduceTasks(1);
// Hbase specific setup
Configuration conf = job.getConfiguration();
job.setOutputFormatClass(HFileOutputFormat.class);
//
// to play with
//conf.setLong("hbase.hregion.max.filesize", 64 * 1024);
//
// Handle input path
List<String> other_args = new ArrayList<String>();
for (int i=0; i < args.length; ++i) {
other_args.add(args[i]);
}
FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
// Submit job to server and wait for completion
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DataChunkToHFiles(), args);
System.exit(res);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment