Skip to content

Instantly share code, notes, and snippets.

@kzk
Created April 13, 2011 17:42
Show Gist options
  • Save kzk/917996 to your computer and use it in GitHub Desktop.
Save kzk/917996 to your computer and use it in GitHub Desktop.
package net.kzk9;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
import org.json.simple.*;
import org.json.simple.parser.*;
import org.msgpack.*;
import org.msgpack.object.*;
public class HBaseTwitterScreenNameCount {
/**
* hBaseのテーブルを入力とするMapper
*/
static class MsgPackParseMapper extends TableMapper<Text, IntWritable> {
public static enum Counters { VALID_ROWS, INVALID_ROWS, DATA_BYTES }
JSONParser parser;
protected Text word = new Text();
protected IntWritable one = new IntWritable(1);
@Override
protected void setup(Mapper.Context context) {
// JSONパーサーを初期化
parser = new JSONParser();
}
// HBaseテーブルの1行がTwitterのMessagePack形式のデータ
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
for (KeyValue value: values.list()) {
// count data
context.getCounter(Counters.DATA_BYTES).increment(value.getValue().length);
String json_str = Bytes.toString(value.getValue());
try {
Map tweet = (Map)parser.parse(new StringReader(json_str));
String id_str = (String)tweet.get("id_str");
if (id_str == null)
throw new IOException("id_str not found");
String text = (String)tweet.get("text");
if (text == null)
throw new IOException("text not found");
Map user = (Map)tweet.get("user");
if (user == null)
throw new IOException("user not found");
String screen_name = (String)user.get("screen_name");
if (screen_name == null)
throw new IOException("screen_name not found");
word.set(screen_name);
context.write(word, one);
// increment valid rows
context.getCounter(Counters.VALID_ROWS).increment(1);
} catch (Throwable e) {
// ignore format & parse error
context.getCounter(Counters.INVALID_ROWS).increment(1);
continue;
}
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
protected IntWritable count = new IntWritable(0);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
count.set(sum);
context.write(key, count);
}
}
public static void main(String[] args) throws Exception {
// 設定情報の読み込み
Configuration conf = HBaseConfiguration.create();
conf.addResource("/etc/hbase/conf/hbase-default.xml");
conf.addResource("/etc/hbase/conf/hbase-site.xml");
conf.set("hbase.client.scanner.caching", "1024");
// conf.set("mapred.job.tracker", "local");
// 引数のパース
new GenericOptionsParser(conf, args);
// ジョブの作成
String tableName = "twitter";
Job job = new Job(conf, "HBaseTwitterAnalysis_" + tableName);
job.setJarByClass(HBaseTwitterScreenNameCount.class);
// Scan条件の指定
Scan scan = new Scan();
// 最初のデータのみを取得
scan.setFilter(new FirstKeyOnlyFilter());
// data:jsonのみを取得
scan = scan.addColumn(Bytes.toBytes("data"), Bytes.toBytes("json"));
// 補助関数を利用したMapperの初期化
TableMapReduceUtil.initTableMapperJob(tableName, // テーブル名
scan, // Mapperに渡す前に使用するScan
MsgPackParseMapper.class, // Mapperクラス
Text.class, // Mapperの出力Keyの型
IntWritable.class, // Mapperの出力Valueの型
job);
// Reducerの初期化
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(8);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
// ジョブの実行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment