Created
January 18, 2011 00:44
-
-
Save alienrobotwizard/783805 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DataChunkToHFiles extends Configured implements Tool { | |
public static class TextToKeyValues extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { | |
private byte[] columnFamily; | |
private byte[] tableName; | |
private int keyField; | |
private String[] fieldNames; | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
super.setup(context); | |
Configuration conf = context.getConfiguration(); | |
tableName = conf.get("hbase.table.name").getBytes(); | |
columnFamily = conf.get("hbase.column.family").getBytes(); | |
keyField = conf.getInt("hbase.key.field", 0); //default to field 0 as the row key | |
fieldNames = conf.get("hbase.field.names").split(","); | |
} | |
protected void map(LongWritable key, Text line, Context context) throws IOException ,InterruptedException { | |
String[] fields = line.toString().split("\t"); | |
byte[] rowKey = fields[keyField].getBytes(); | |
// Create output for Hbase reducer | |
ImmutableBytesWritable hbaseRowKey = new ImmutableBytesWritable(rowKey); | |
// for(int i = 0; i < fields.length; i++) { | |
// if (i < fieldNames.length && i != keyField) { | |
// if (fields[i].length() != 0) { | |
byte[] columnName = fieldNames[1].getBytes(); | |
byte[] columnValue = fields[1].getBytes(); | |
KeyValue kv = new KeyValue(rowKey, columnFamily, columnName, columnValue); | |
context.write(hbaseRowKey, kv); | |
// } | |
// } | |
// } | |
} | |
} | |
public int run(String[] args) throws Exception { | |
Job job = new Job(getConf()); | |
job.setJarByClass(DataChunkToHFiles.class); | |
job.setJobName(" @('_')@ data data data"); | |
job.setMapOutputKeyClass(ImmutableBytesWritable.class); | |
job.setMapOutputValueClass(KeyValue.class); | |
job.setMapperClass(TextToKeyValues.class); | |
job.setReducerClass(KeyValueSortReducer.class); | |
job.setNumReduceTasks(1); | |
// Hbase specific setup | |
Configuration conf = job.getConfiguration(); | |
job.setOutputFormatClass(HFileOutputFormat.class); | |
// | |
// to play with | |
//conf.setLong("hbase.hregion.max.filesize", 64 * 1024); | |
// | |
// Handle input path | |
List<String> other_args = new ArrayList<String>(); | |
for (int i=0; i < args.length; ++i) { | |
other_args.add(args[i]); | |
} | |
FileInputFormat.setInputPaths(job, new Path(other_args.get(0))); | |
FileOutputFormat.setOutputPath(job, new Path(other_args.get(1))); | |
// Submit job to server and wait for completion | |
job.waitForCompletion(true); | |
return 0; | |
} | |
public static void main(String[] args) throws Exception { | |
int res = ToolRunner.run(new Configuration(), new DataChunkToHFiles(), args); | |
System.exit(res); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment