Skip to content

Instantly share code, notes, and snippets.

@ravindranathakila
Forked from need4spd/gist:4584416
Last active August 29, 2015 14:08
Show Gist options
  • Save ravindranathakila/bdc0b9269d78512a3a3e to your computer and use it in GitHub Desktop.
Save ravindranathakila/bdc0b9269d78512a3a3e to your computer and use it in GitHub Desktop.
//mapper
package com.tistory.devyongsik.hadoop.mapre;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.tistory.devyongsik.hadoop.domain.StatisticParsedLog;
import com.tistory.devyongsik.hadoop.parser.GeneralSearchLogParser;
import com.tistory.devyongsik.hadoop.parser.SearchLogParser;
/**
* @author need4spd, [email protected], 2013. 1. 17.
*
*/
public class KeywordMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private SearchLogParser searchLogParser = new GeneralSearchLogParser();
private Text outputKey = new Text();
private final static DoubleWritable one = new DoubleWritable(1);
private DoubleWritable outputValue = new DoubleWritable();
private enum PreKeywordCounter {
no_prekeyword, have_prekeyword, not_analysis
}
/**
* output :
* - KC,키워드, 키워드별 검색횟수 (KC, 로그날짜, 키워드 : 횟수)
* - KR,키워드, 키워드별 평균응답시간 (KR, 로그날짜, 키워드 : 평균응답시간)
* - KP,키워드, 키워드별 재검색어 목록 (KP, 로그날짜, 키워드, 재검색어 : 횟수)
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String logText = value.toString();
//System.out.println("input text : " + logText);
StatisticParsedLog statisticParsedLog = searchLogParser.parse(logText);
if(statisticParsedLog == null) {
context.getCounter(PreKeywordCounter.not_analysis).increment(1);
return;
}
String keyword = statisticParsedLog.getKeyword();
String preKeyword = statisticParsedLog.getPreKeyword();
String responseTime = statisticParsedLog.getResponseTime();
String logDate = statisticParsedLog.getLogDate();
System.out.println("keyword : " + keyword);
//1. 키워드별 검색횟수
//Text requestCountByKeywordOutputKey = new Text();
//Text requestCountByKeywordOutputText = new Text();
outputKey.set("KC,"+logDate+","+keyword);
context.write(outputKey, one);
//total을 하나 만들어서 넣는다.
outputKey.set("KC,"+logDate+","+"total");
context.write(outputKey, one);
//2. 키워드별 평균응답시간
//Text responseTimeByKeywordOutputKey = new Text();
//Text responseTimeByKeywordOutputText = new Text();
outputKey.set("KR,"+logDate+","+keyword);
outputValue.set(Double.parseDouble(responseTime));
context.write(outputKey, outputValue);
//total을 하나 만들어서 넣는다.
outputKey.set("KR,"+logDate+","+"total");
outputValue.set(Double.parseDouble(responseTime));
context.write(outputKey, outputValue);
//3. 키워드별 재검색어 목록
if(preKeyword != null && preKeyword.length() > 0) {
String[] preKeywordArray = preKeyword.split("\\|\\|");
for(String preKwd : preKeywordArray) {
context.getCounter(PreKeywordCounter.have_prekeyword).increment(1);
//Text preKeywordByKeywordOutputKey = new Text();
//Text preKeywordByKeywordOutputText = new Text();
outputKey.set("KP,"+logDate+","+keyword+","+preKwd);
context.write(outputKey, one);
}
} else {
context.getCounter(PreKeywordCounter.no_prekeyword).increment(1);
}
}
}
//reducer
package com.tistory.devyongsik.hadoop.mapre;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
/**
* @author need4spd, [email protected], 2013. 1. 18.
*
*/
public class KeywordReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private MultipleOutputs<Text, DoubleWritable> mos;
private Text outputKey = new Text();
private DoubleWritable outputValue = new DoubleWritable();
@Override
public void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, DoubleWritable>(context);
}
/**
* input :
* KC,키워드, 키워드별 검색횟수 (KC, 로그날짜, 키워드 : 횟수)
* KR,키워드, 키워드별 평균응답시간 (KR, 로그날짜, 키워드 : 횟수, 응답시간합, 평균응답시간)
* KP,키워드, 키워드별 재검색어 목록 (KP, 로그날짜, 키워드, 재검색어 : 횟수)
*
* KC,20130106,hum : 1
* KR,20130106,hum : 58
* KP,20130106,hum,hum1 : 1;
* KP,20130106,hum,hum2 : 1;
*/
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
String[] splitKey = key.toString().split(",");
String gubun = splitKey[0];
outputKey.set(splitKey[1]+","+splitKey[2]);
System.out.println("splitKey : " + Arrays.toString(splitKey));
//키워드별 검색횟수
if("KC".equals(gubun)) {
System.out.println("gubun KC!!");
double sumRequestCount = 0;
for(DoubleWritable value : values) {
sumRequestCount += value.get();
}
outputValue.set(sumRequestCount);
mos.write("requestcountbykeyword", outputKey, outputValue);
//키워드별 평균응답시간
} else if("KR".equals(gubun)) {
double keyCount = 0;
System.out.println("gubun KR!!");
double sumResponseTime = 0;
for(DoubleWritable value : values) {
keyCount++;
sumResponseTime += value.get();
}
System.out.println("KR keyCounter : " + keyCount);
double averageResponseTime = sumResponseTime / keyCount;
outputValue.set(averageResponseTime);
mos.write("averageresponsetimebykeyword", outputKey, outputValue);
} else if("KP".equals(gubun)) {
System.out.println("gubun KP!!");
if(splitKey[2].equals(splitKey[3])) {
return;
}
double sumPreKeywordCount = 0;
for(DoubleWritable value : values) {
sumPreKeywordCount += value.get();
}
outputValue.set(sumPreKeywordCount);
outputKey.clear();
outputKey.set(splitKey[1]+","+splitKey[2]+","+splitKey[3]);
System.out.println("gubun KP!! Write!!!");
mos.write("prekeywordcountbykeyword", outputKey, outputValue);
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
//driver
package com.tistory.devyongsik.hadoop.mapre;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author need4spd, [email protected], 2013. 1. 21.
*
*/
public class KeywordStatisticWithMultipleOutputs extends Configured implements Tool {
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage : <in> <out>");
System.exit(2);
}
Job job = new Job(getConf(), "KeywordStatisticWithMultipleOutputs");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(KeywordStatisticWithMultipleOutputs.class);
job.setMapperClass(KeywordMapper.class);
job.setReducerClass(KeywordReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
MultipleOutputs.addNamedOutput(job, "requestcountbykeyword", TextOutputFormat.class, Text.class, DoubleWritable.class);
MultipleOutputs.addNamedOutput(job, "averageresponsetimebykeyword", TextOutputFormat.class, Text.class, DoubleWritable.class);
MultipleOutputs.addNamedOutput(job, "prekeywordcountbykeyword", TextOutputFormat.class, Text.class, DoubleWritable.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new KeywordStatisticWithMultipleOutputs(), args);
System.out.println("## RESULT : " + res);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment