|
package org.apache.hadoop.hbase.mapreduce; |
|
|
|
import java.io.ByteArrayInputStream; |
|
import java.io.ByteArrayOutputStream; |
|
import java.io.IOException; |
|
import java.io.InterruptedIOException; |
|
import java.io.ObjectInputStream; |
|
import java.io.ObjectOutputStream; |
|
import java.util.ArrayList; |
|
import java.util.Collection; |
|
import java.util.List; |
|
import java.util.Map.Entry; |
|
|
|
import org.apache.hadoop.conf.Configurable; |
|
import org.apache.hadoop.conf.Configuration; |
|
import org.apache.hadoop.hbase.client.Result; |
|
import org.apache.hadoop.hbase.client.Scan; |
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
import org.apache.hadoop.hbase.util.Base64; |
|
import org.apache.hadoop.mapreduce.InputSplit; |
|
import org.apache.hadoop.mapreduce.JobContext; |
|
import org.apache.hadoop.mapreduce.RecordReader; |
|
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
|
|
|
import com.google.ten.common.collect.ArrayListMultimap; |
|
import com.google.ten.common.collect.Iterables; |
|
import com.google.ten.common.collect.Lists; |
|
import com.google.ten.common.collect.Multimap; |
|
|
|
/** |
|
* Convert HBase tabular data into a format that is consumable by Map/Reduce. |
|
*/ |
|
public class MultipleScanTableInputFormat extends TableInputFormatBase implements Configurable { |
|
|
|
public static final String START_ROWS = "hbase.mapreduce.startrows"; |
|
public static final String STOP_ROWS = "hbase.mapreduce.stoprows"; |
|
public static final String PARTITION_BY_REGION_SERVER = "hbase.mapreduce.partitionByRegionServer"; |
|
public static final String PARTITIONS_PER_REGION_SERVER = "hbase.mapreduce.partitionByRegionServer.numPartitionsPerRegionServer"; |
|
public static final int PARTITIONS_PER_REGION_SERVER_DEFAULT = 1; |
|
|
|
private Configuration conf; |
|
|
|
private List<byte[]> startRows; |
|
private List<byte[]> stopRows; |
|
|
|
private boolean partitionByRegionServer; |
|
private int numPartitionsPerRegionServer; |
|
|
|
/** |
|
* Returns the current configuration. |
|
* |
|
* @return The current configuration. |
|
* @see org.apache.hadoop.conf.Configurable#getConf() |
|
*/ |
|
@Override |
|
public Configuration getConf() { |
|
return conf; |
|
} |
|
|
|
@Override |
|
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit inputsplit, TaskAttemptContext context) throws IOException { |
|
LOG.info(String.format("Task attempt: %s, table split: %s", context.getTaskAttemptID(), inputsplit)); |
|
|
|
// If partitioning by regionserver, we need a bunch of readers, one for each scan that will run against |
|
// each regionserver. |
|
if (partitionByRegionServer && inputsplit instanceof RegionServerInputSplit) { |
|
RegionServerInputSplit combined = (RegionServerInputSplit) inputsplit; |
|
LOG.info("Running scans against: " + combined.getRegionServer()); |
|
|
|
List<TableRecordReader> readers = Lists.newArrayList(); |
|
for (TableSplit split : combined.getSplits()) { |
|
readers.add(createRecordReaderForRegionServer(split, context)); |
|
} |
|
|
|
return new RegionServerPartitionedRecordReader(readers); |
|
} |
|
|
|
return super.createRecordReader(inputsplit, context); |
|
} |
|
|
|
private TableRecordReader createRecordReaderForRegionServer(TableSplit split, TaskAttemptContext context) throws IOException { |
|
TableRecordReader trr = new TableRecordReader(); |
|
|
|
Scan sc = new Scan(this.getScan()); |
|
sc.setStartRow(split.getStartRow()); |
|
sc.setStopRow(split.getEndRow()); |
|
|
|
trr.setScan(sc); |
|
trr.setHTable(getHTable()); |
|
|
|
return trr; |
|
} |
|
|
|
/** |
|
* Sets the configuration. This is used to set the details for the table to |
|
* be scanned. |
|
* |
|
* @param configuration The configuration to set. |
|
* @see org.apache.hadoop.conf.Configurable#setConf( |
|
* org.apache.hadoop.conf.Configuration) |
|
*/ |
|
@Override |
|
public void setConf(Configuration configuration) { |
|
this.conf = configuration; |
|
|
|
this.partitionByRegionServer = conf.getBoolean(PARTITION_BY_REGION_SERVER, false); |
|
this.numPartitionsPerRegionServer = conf.getInt(PARTITIONS_PER_REGION_SERVER, PARTITIONS_PER_REGION_SERVER_DEFAULT); |
|
|
|
try { |
|
setHTable(new HTable(conf, conf.get(TableInputFormat.INPUT_TABLE))); |
|
setScan(TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN))); |
|
|
|
this.startRows = convertStringToRows(conf.get(START_ROWS)); |
|
this.stopRows = convertStringToRows(conf.get(STOP_ROWS)); |
|
|
|
} catch (IOException e) { |
|
throw new RuntimeException(e); |
|
} catch (ClassNotFoundException e) { |
|
throw new RuntimeException(e); |
|
} |
|
} |
|
|
|
@Override |
|
public List<InputSplit> getSplits(JobContext context) throws IOException { |
|
List<InputSplit> source = getAggregatedSplits(context); |
|
|
|
if (!partitionByRegionServer) { |
|
return source; |
|
} |
|
|
|
// Partition by regionserver |
|
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create(); |
|
for (InputSplit split : source) { |
|
TableSplit cast = (TableSplit) split; |
|
String rs = cast.getRegionLocation(); |
|
|
|
partitioned.put(rs, cast); |
|
} |
|
|
|
// Combine all splits for a regionserver into a single split |
|
List<InputSplit> result = new ArrayList<InputSplit>(); |
|
for (Entry<String, Collection<TableSplit>> entry : partitioned.asMap().entrySet()) { |
|
|
|
for (List<TableSplit> partition : Iterables.partition(entry.getValue(), (int) Math.ceil((double) entry.getValue().size() / numPartitionsPerRegionServer))) { |
|
InputSplit split = new RegionServerInputSplit(entry.getKey(), partition); |
|
result.add(split); |
|
} |
|
} |
|
|
|
return result; |
|
} |
|
|
|
private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException { |
|
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>(); |
|
|
|
final Scan scan = getScan(); |
|
|
|
for (int i = 0; i < startRows.size(); i++) { |
|
scan.setStartRow(startRows.get(i)); |
|
scan.setStopRow(stopRows.get(i)); |
|
|
|
setScan(scan); |
|
|
|
aggregatedSplits.addAll(super.getSplits(context)); |
|
} |
|
|
|
// set the state back to where it was.. |
|
scan.setStopRow(null); |
|
scan.setStartRow(null); |
|
|
|
setScan(scan); |
|
|
|
return aggregatedSplits; |
|
} |
|
|
|
@SuppressWarnings("unchecked") |
|
private static List<byte[]> convertStringToRows(String base64) throws IOException, ClassNotFoundException { |
|
ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64)); |
|
ObjectInputStream is = new ObjectInputStream(bis); |
|
return (List<byte[]>) is.readObject(); |
|
} |
|
|
|
public static String convertRowsToString(List<byte[]> rows) throws IOException { |
|
ByteArrayOutputStream out = new ByteArrayOutputStream(); |
|
ObjectOutputStream oos = new ObjectOutputStream(out); |
|
oos.writeObject(rows); |
|
return Base64.encodeBytes(out.toByteArray()); |
|
} |
|
|
|
} |