Created
October 16, 2012 23:46
-
-
Save shrijeet/3902807 to your computer and use it in GitHub Desktop.
FilterRowsWithGivenColumns MapReduce program
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hbase.experiments; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.HashSet; | |
import java.util.List; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.filter.BinaryComparator; | |
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; | |
import org.apache.hadoop.hbase.filter.Filter; | |
import org.apache.hadoop.hbase.filter.FilterList; | |
import org.apache.hadoop.hbase.filter.FilterList.Operator; | |
import org.apache.hadoop.hbase.filter.QualifierFilter; | |
import org.apache.hadoop.hbase.filter.SkipFilter; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; | |
import org.apache.hadoop.hbase.mapreduce.TableMapper; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
import com.google.common.base.Splitter; | |
import com.google.common.primitives.Longs; | |
import com.sun.org.apache.commons.logging.Log; | |
import com.sun.org.apache.commons.logging.LogFactory; | |
/** | |
* A job which finds all rows which do not have a cell corresponding to any of | |
* the columns in the given list of columns. | |
*/ | |
public class FilterRowsWithGivenColumns { | |
/** Name of this 'program'. */ | |
static final String NAME = "FilterRowsWithGivenColumns"; | |
private static final Log LOG = | |
LogFactory.getLog(FilterRowsWithGivenColumns.class.getName()); | |
/** | |
* Mapper that runs the count. | |
*/ | |
static class FilterRowsWithGivenColumnsMapper | |
extends TableMapper<Text, Text> { | |
Text empty = new Text(""); | |
Text key_to_write = new Text(); | |
long maxTs = Long.MAX_VALUE; | |
long minTs = Long.MIN_VALUE; | |
/** Counter enumeration to count the actual rows. */ | |
public static enum Counters { | |
MATCHING_ROWS, OUTSIDE_TIME_RANGE | |
} | |
@Override | |
public void setup(Context context) | |
throws IOException, | |
InterruptedException { | |
maxTs = context.getConfiguration().getLong("maxts", Long.MAX_VALUE); | |
minTs = context.getConfiguration().getLong("mints", Long.MIN_VALUE); | |
} | |
/** | |
* Maps the data. | |
* | |
* @param row The current table row key. | |
* @param values The columns. | |
* @param context The current context. | |
* @throws IOException When something is broken with the data. | |
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, | |
* org.apache.hadoop.mapreduce.Mapper.Context) | |
*/ | |
@Override | |
public void map(ImmutableBytesWritable row, Result values, | |
Context context) | |
throws IOException { | |
try { | |
long [] tsArray = new long[values.size()]; | |
int i = 0; | |
for (KeyValue value : values.list()) { | |
tsArray[i] = value.getTimestamp(); | |
i++; | |
} | |
if (Longs.min(tsArray) < minTs || Longs.max(tsArray) > maxTs) { | |
context.getCounter(Counters.OUTSIDE_TIME_RANGE).increment(1); | |
return; | |
} else { | |
context.getCounter(Counters.MATCHING_ROWS).increment(1); | |
key_to_write.set(row.get()); | |
context.write(key_to_write, empty); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
/** | |
* Sets up the actual job. | |
* | |
* @param conf The current configuration. | |
* @param args The command line parameters. | |
* @return The newly created job. | |
* @throws IOException When setting up the job fails. | |
*/ | |
public static Job createSubmittableJob(Configuration conf, String [] args) | |
throws IOException { | |
LOG.info("Working with args: " + Arrays.toString(args)); | |
Path outputDir = new Path(args[0]); | |
String tableName = args[1]; | |
String family = args[2]; | |
String columns = args[3]; | |
Long maxTs = Long.valueOf(args[4]); | |
Long minTs = Long.valueOf(args[5]); | |
conf.setLong("maxts", maxTs); | |
conf.setLong("mints", minTs); | |
Iterable<String> columnsSet = new HashSet<String>(); | |
columnsSet = Splitter.on(',') | |
.split(columns); | |
List<Filter> qualifierFilters = new ArrayList<Filter>(); | |
for (String qual : columnsSet) { | |
qualifierFilters.add(new QualifierFilter(CompareOp.NOT_EQUAL, | |
new BinaryComparator(Bytes.toBytes(qual)))); | |
} | |
Filter skipFilter = new SkipFilter(new FilterList(Operator.MUST_PASS_ALL, qualifierFilters)); | |
Scan scan = new Scan(); | |
scan.addFamily(Bytes.toBytes(family)); | |
scan.setCacheBlocks(false); | |
scan.setCaching(1000); | |
scan.setFilter(skipFilter); | |
LOG.info("Scan has been configured as " + scan); | |
Job job = new Job(conf, NAME + "_" + tableName); | |
job.setJarByClass(FilterRowsWithGivenColumns.class); | |
TableMapReduceUtil.initTableMapperJob(tableName, scan, | |
FilterRowsWithGivenColumnsMapper.class, ImmutableBytesWritable.class, Result.class, job); | |
job.setNumReduceTasks(0); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
job.setOutputFormatClass(SequenceFileOutputFormat.class); | |
FileOutputFormat.setOutputPath(job, outputDir); | |
return job; | |
} | |
/** | |
* Main entry point. | |
* | |
* @param args The command line parameters. | |
* @throws Exception When running the job fails. | |
*/ | |
public static void main(String [] args) throws Exception { | |
Configuration conf = HBaseConfiguration.create(); | |
String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); | |
if (otherArgs.length != 6) { | |
System.err.println("ERROR: Wrong number of parameters: " + otherArgs.length); | |
System.err | |
.println("Usage: CoulmnCounter <output_dir> <tablename> <family> <column1,coulmn2..> maxTS minTS"); | |
System.exit(-1); | |
} | |
Job job = createSubmittableJob(conf, otherArgs); | |
boolean success = false; | |
try { | |
success = job.waitForCompletion(true); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
System.exit(success ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment