Created
April 13, 2011 17:42
-
-
Save kzk/917996 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.kzk9; | |
import java.io.*; | |
import java.util.*; | |
import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.fs.*; | |
import org.apache.hadoop.hbase.*; | |
import org.apache.hadoop.hbase.client.*; | |
import org.apache.hadoop.hbase.filter.*; | |
import org.apache.hadoop.hbase.io.*; | |
import org.apache.hadoop.hbase.util.*; | |
import org.apache.hadoop.hbase.mapreduce.*; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.mapreduce.lib.output.*; | |
import org.apache.hadoop.util.*; | |
import org.json.simple.*; | |
import org.json.simple.parser.*; | |
import org.msgpack.*; | |
import org.msgpack.object.*; | |
public class HBaseTwitterScreenNameCount { | |
/** | |
* hBaseのテーブルを入力とするMapper | |
*/ | |
static class MsgPackParseMapper extends TableMapper<Text, IntWritable> { | |
public static enum Counters { VALID_ROWS, INVALID_ROWS, DATA_BYTES } | |
JSONParser parser; | |
protected Text word = new Text(); | |
protected IntWritable one = new IntWritable(1); | |
@Override | |
protected void setup(Mapper.Context context) { | |
// JSONパーサーを初期化 | |
parser = new JSONParser(); | |
} | |
// HBaseテーブルの1行がTwitterのMessagePack形式のデータ | |
@Override | |
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { | |
for (KeyValue value: values.list()) { | |
// count data | |
context.getCounter(Counters.DATA_BYTES).increment(value.getValue().length); | |
String json_str = Bytes.toString(value.getValue()); | |
try { | |
Map tweet = (Map)parser.parse(new StringReader(json_str)); | |
String id_str = (String)tweet.get("id_str"); | |
if (id_str == null) | |
throw new IOException("id_str not found"); | |
String text = (String)tweet.get("text"); | |
if (text == null) | |
throw new IOException("text not found"); | |
Map user = (Map)tweet.get("user"); | |
if (user == null) | |
throw new IOException("user not found"); | |
String screen_name = (String)user.get("screen_name"); | |
if (screen_name == null) | |
throw new IOException("screen_name not found"); | |
word.set(screen_name); | |
context.write(word, one); | |
// increment valid rows | |
context.getCounter(Counters.VALID_ROWS).increment(1); | |
} catch (Throwable e) { | |
// ignore format & parse error | |
context.getCounter(Counters.INVALID_ROWS).increment(1); | |
continue; | |
} | |
} | |
} | |
} | |
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { | |
protected IntWritable count = new IntWritable(0); | |
@Override | |
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { | |
int sum = 0; | |
for (IntWritable value : values) { | |
sum += value.get(); | |
} | |
count.set(sum); | |
context.write(key, count); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
// 設定情報の読み込み | |
Configuration conf = HBaseConfiguration.create(); | |
conf.addResource("/etc/hbase/conf/hbase-default.xml"); | |
conf.addResource("/etc/hbase/conf/hbase-site.xml"); | |
conf.set("hbase.client.scanner.caching", "1024"); | |
// conf.set("mapred.job.tracker", "local"); | |
// 引数のパース | |
new GenericOptionsParser(conf, args); | |
// ジョブの作成 | |
String tableName = "twitter"; | |
Job job = new Job(conf, "HBaseTwitterAnalysis_" + tableName); | |
job.setJarByClass(HBaseTwitterScreenNameCount.class); | |
// Scan条件の指定 | |
Scan scan = new Scan(); | |
// 最初のデータのみを取得 | |
scan.setFilter(new FirstKeyOnlyFilter()); | |
// data:jsonのみを取得 | |
scan = scan.addColumn(Bytes.toBytes("data"), Bytes.toBytes("json")); | |
// 補助関数を利用したMapperの初期化 | |
TableMapReduceUtil.initTableMapperJob(tableName, // テーブル名 | |
scan, // Mapperに渡す前に使用するScan | |
MsgPackParseMapper.class, // Mapperクラス | |
Text.class, // Mapperの出力Keyの型 | |
IntWritable.class, // Mapperの出力Valueの型 | |
job); | |
// Reducerの初期化 | |
job.setCombinerClass(Reduce.class); | |
job.setReducerClass(Reduce.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setNumReduceTasks(8); | |
FileOutputFormat.setOutputPath(job, new Path(args[0])); | |
// ジョブの実行 | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment