Created
January 2, 2011 01:07
-
-
Save kzk/762172 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.kzk9; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.hbase.*; | |
import org.apache.hadoop.hbase.client.*; | |
import org.apache.hadoop.hbase.filter.*; | |
import org.apache.hadoop.hbase.io.*; | |
import org.apache.hadoop.hbase.util.*; | |
import org.apache.hadoop.hbase.mapreduce.*; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.mapreduce.lib.output.*; | |
import org.apache.hadoop.util.*; | |
/** | |
* HBaseテーブルの行数をカウントするMapReduceジョブ | |
* 行数はCounterでカウント。データ出力は無し。 | |
*/ | |
public class HBaseTableRowCounter { | |
/** | |
* hBaseのテーブルを入力とするMapper | |
* MapReduce Counter APIを使用して行数をカウント | |
*/ | |
static class RowCounterMapper | |
extends TableMapper<ImmutableBytesWritable, Result> { | |
public static enum Counters {ROWS} | |
// HBaseテーブルの1行がmap関数の入力 | |
@Override | |
public void map(ImmutableBytesWritable row, | |
Result values, | |
Context context) throws IOException { | |
for (KeyValue value: values.list()) { | |
if (value.getValue().length > 0) { | |
context.getCounter(Counters.ROWS).increment(1); | |
break; | |
} | |
} | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
// 設定情報の読み込み | |
Configuration conf = HBaseConfiguration.create(); | |
conf.addResource("/etc/hbase/conf/hbase-default.xml"); | |
conf.addResource("/etc/hbase/conf/hbase-site.xml"); | |
// 引数のパース | |
new GenericOptionsParser(conf, args); | |
// ジョブの作成 | |
String tableName = "sample"; | |
Job job = new Job(conf, "hBaseTableRowCounter_" + tableName); | |
job.setJarByClass(HBaseTableRowCounter.class); | |
// Reducerは使用しない。Counterで行数を数える。 | |
job.setOutputFormatClass(NullOutputFormat.class); | |
job.setNumReduceTasks(0); | |
// Scan条件の指定 | |
Scan scan = new Scan(); | |
// 各行に関して、最初のKey-ValueペアしかScanしない | |
scan.setFilter(new FirstKeyOnlyFilter()); | |
// 補助関数を利用したMapperの初期化 | |
TableMapReduceUtil.initTableMapperJob( | |
tableName, // テーブル名 | |
scan, // Mapperに渡す前に使用するScan | |
RowCounterMapper.class, // Mapperクラス | |
ImmutableBytesWritable.class, // MapperのKeyの型 | |
Result.class, // MapperのValueの型 | |
job); | |
// ジョブの実行 | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment