Skip to content

Instantly share code, notes, and snippets.

@kzk
Created January 2, 2011 03:05
Show Gist options
  • Save kzk/762238 to your computer and use it in GitHub Desktop.
Save kzk/762238 to your computer and use it in GitHub Desktop.
package net.kzk9;
import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
/**
* HBaseテーブルの行数をカウントするMapReduceジョブ
* 行数はCounterでカウント。データ出力は無し。
*/
public class HBaseTableUniqValueCounter {
/**
* hBaseのテーブルを入力とするMapper
* <Key, Values>が渡って来るので、<Value, 1>を出力するMapper
* Keyはテーブルの行キー, Valuesは全列の値
*/
static class UniqValueCounterMapper
extends TableMapper<ImmutableBytesWritable, IntWritable> {
ImmutableBytesWritable key = new ImmutableBytesWritable();
IntWritable one = new IntWritable(1);
@Override
public void map(ImmutableBytesWritable row,
Result values,
Context context) throws IOException, InterruptedException {
for (KeyValue value: values.list()) {
key.set(value.getValue());
// <Value, 1> を出力する
context.write(key, one);
}
}
}
/**
* hBaseのテーブルを出力とするReducer
* 出力Valueの型はPut or Resultで有る必要が有る
* Valueの出現回数のSumを取る
*/
static class UniqValueCounterReducer
extends TableReducer<ImmutableBytesWritable, IntWritable, NullWritable> {
byte[] family = Bytes.toBytes("data");
byte[] column = Bytes.toBytes("column");
@Override
public void reduce(ImmutableBytesWritable row,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 値の出現回数をカウント
int sum = 0;
for (IntWritable value : values)
sum += value.get();
// Putクラスを作成し、テーブルへの出力方法を指定
Put put = new Put(row.get());
put.add(family, column, Bytes.toBytes(Integer.toString(sum)));
// KeyはNullWritable, ValueはPutクラス
context.write(NullWritable.get(), put);
}
}
public static void main(String[] args) throws Exception {
// 設定情報の読み込み
Configuration conf = HBaseConfiguration.create();
conf.addResource("/etc/hbase/conf/hbase-default.xml");
conf.addResource("/etc/hbase/conf/hbase-site.xml");
// 引数のパース
new GenericOptionsParser(conf, args);
// ジョブの作成
String tableName = "sample";
Job job = new Job(conf, "hBaseTableRowCounter_" + tableName);
job.setJarByClass(HBaseTableUniqValueCounter.class);
// 補助関数を利用したMapperの初期化
TableMapReduceUtil.initTableMapperJob(
"sample", // 入力テーブル名
new Scan(), // Mapperに渡す前に使用するScan
UniqValueCounterMapper.class, // Mapperクラス
ImmutableBytesWritable.class, // Mapperの出力Keyの型
IntWritable.class, // Mapperの出力Valueの型
job);
// 補助関数を利用したReducerの初期化
TableMapReduceUtil.initTableReducerJob(
"sample_uniq", // 出力テーブル名
UniqValueCounterReducer.class,
job);
// ジョブの実行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment