Created
January 2, 2011 03:05
-
-
Save kzk/762238 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.kzk9; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.hbase.*; | |
import org.apache.hadoop.hbase.client.*; | |
import org.apache.hadoop.hbase.filter.*; | |
import org.apache.hadoop.hbase.io.*; | |
import org.apache.hadoop.hbase.util.*; | |
import org.apache.hadoop.hbase.mapreduce.*; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.mapreduce.lib.output.*; | |
import org.apache.hadoop.util.*; | |
/** | |
* HBaseテーブルの行数をカウントするMapReduceジョブ | |
* 行数はCounterでカウント。データ出力は無し。 | |
*/ | |
public class HBaseTableUniqValueCounter { | |
/** | |
* hBaseのテーブルを入力とするMapper | |
* <Key, Values>が渡って来るので、<Value, 1>を出力するMapper | |
* Keyはテーブルの行キー, Valuesは全列の値 | |
*/ | |
static class UniqValueCounterMapper | |
extends TableMapper<ImmutableBytesWritable, IntWritable> { | |
ImmutableBytesWritable key = new ImmutableBytesWritable(); | |
IntWritable one = new IntWritable(1); | |
@Override | |
public void map(ImmutableBytesWritable row, | |
Result values, | |
Context context) throws IOException, InterruptedException { | |
for (KeyValue value: values.list()) { | |
key.set(value.getValue()); | |
// <Value, 1> を出力する | |
context.write(key, one); | |
} | |
} | |
} | |
/** | |
* hBaseのテーブルを出力とするReducer | |
* 出力Valueの型はPut or Resultで有る必要が有る | |
* Valueの出現回数のSumを取る | |
*/ | |
static class UniqValueCounterReducer | |
extends TableReducer<ImmutableBytesWritable, IntWritable, NullWritable> { | |
byte[] family = Bytes.toBytes("data"); | |
byte[] column = Bytes.toBytes("column"); | |
@Override | |
public void reduce(ImmutableBytesWritable row, | |
Iterable<IntWritable> values, | |
Context context) throws IOException, InterruptedException { | |
// 値の出現回数をカウント | |
int sum = 0; | |
for (IntWritable value : values) | |
sum += value.get(); | |
// Putクラスを作成し、テーブルへの出力方法を指定 | |
Put put = new Put(row.get()); | |
put.add(family, column, Bytes.toBytes(Integer.toString(sum))); | |
// KeyはNullWritable, ValueはPutクラス | |
context.write(NullWritable.get(), put); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
// 設定情報の読み込み | |
Configuration conf = HBaseConfiguration.create(); | |
conf.addResource("/etc/hbase/conf/hbase-default.xml"); | |
conf.addResource("/etc/hbase/conf/hbase-site.xml"); | |
// 引数のパース | |
new GenericOptionsParser(conf, args); | |
// ジョブの作成 | |
String tableName = "sample"; | |
Job job = new Job(conf, "hBaseTableRowCounter_" + tableName); | |
job.setJarByClass(HBaseTableUniqValueCounter.class); | |
// 補助関数を利用したMapperの初期化 | |
TableMapReduceUtil.initTableMapperJob( | |
"sample", // 入力テーブル名 | |
new Scan(), // Mapperに渡す前に使用するScan | |
UniqValueCounterMapper.class, // Mapperクラス | |
ImmutableBytesWritable.class, // Mapperの出力Keyの型 | |
IntWritable.class, // Mapperの出力Valueの型 | |
job); | |
// 補助関数を利用したReducerの初期化 | |
TableMapReduceUtil.initTableReducerJob( | |
"sample_uniq", // 出力テーブル名 | |
UniqValueCounterReducer.class, | |
job); | |
// ジョブの実行 | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment