Created
January 21, 2013 08:01
-
-
Save need4spd/4584416 to your computer and use it in GitHub Desktop.
hadoop multiple outputs map/reduce sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//mapper | |
package com.tistory.devyongsik.hadoop.mapre; | |
import java.io.IOException; | |
import org.apache.hadoop.io.DoubleWritable; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import com.tistory.devyongsik.hadoop.domain.StatisticParsedLog; | |
import com.tistory.devyongsik.hadoop.parser.GeneralSearchLogParser; | |
import com.tistory.devyongsik.hadoop.parser.SearchLogParser; | |
/** | |
* @author need4spd, [email protected], 2013. 1. 17. | |
* | |
*/ | |
public class KeywordMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { | |
private SearchLogParser searchLogParser = new GeneralSearchLogParser(); | |
private Text outputKey = new Text(); | |
private final static DoubleWritable one = new DoubleWritable(1); | |
private DoubleWritable outputValue = new DoubleWritable(); | |
private enum PreKeywordCounter { | |
no_prekeyword, have_prekeyword, not_analysis | |
} | |
/** | |
* output : | |
* - KC,키워드, 키워드별 검색횟수 (KC, 로그날짜, 키워드 : 횟수) | |
* - KR,키워드, 키워드별 평균응답시간 (KR, 로그날짜, 키워드 : 평균응답시간) | |
* - KP,키워드, 키워드별 재검색어 목록 (KP, 로그날짜, 키워드, 재검색어 : 횟수) | |
*/ | |
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { | |
String logText = value.toString(); | |
//System.out.println("input text : " + logText); | |
StatisticParsedLog statisticParsedLog = searchLogParser.parse(logText); | |
if(statisticParsedLog == null) { | |
context.getCounter(PreKeywordCounter.not_analysis).increment(1); | |
return; | |
} | |
String keyword = statisticParsedLog.getKeyword(); | |
String preKeyword = statisticParsedLog.getPreKeyword(); | |
String responseTime = statisticParsedLog.getResponseTime(); | |
String logDate = statisticParsedLog.getLogDate(); | |
System.out.println("keyword : " + keyword); | |
//1. 키워드별 검색횟수 | |
//Text requestCountByKeywordOutputKey = new Text(); | |
//Text requestCountByKeywordOutputText = new Text(); | |
outputKey.set("KC,"+logDate+","+keyword); | |
context.write(outputKey, one); | |
//total을 하나 만들어서 넣는다. | |
outputKey.set("KC,"+logDate+","+"total"); | |
context.write(outputKey, one); | |
//2. 키워드별 평균응답시간 | |
//Text responseTimeByKeywordOutputKey = new Text(); | |
//Text responseTimeByKeywordOutputText = new Text(); | |
outputKey.set("KR,"+logDate+","+keyword); | |
outputValue.set(Double.parseDouble(responseTime)); | |
context.write(outputKey, outputValue); | |
//total을 하나 만들어서 넣는다. | |
outputKey.set("KR,"+logDate+","+"total"); | |
outputValue.set(Double.parseDouble(responseTime)); | |
context.write(outputKey, outputValue); | |
//3. 키워드별 재검색어 목록 | |
if(preKeyword != null && preKeyword.length() > 0) { | |
String[] preKeywordArray = preKeyword.split("\\|\\|"); | |
for(String preKwd : preKeywordArray) { | |
context.getCounter(PreKeywordCounter.have_prekeyword).increment(1); | |
//Text preKeywordByKeywordOutputKey = new Text(); | |
//Text preKeywordByKeywordOutputText = new Text(); | |
outputKey.set("KP,"+logDate+","+keyword+","+preKwd); | |
context.write(outputKey, one); | |
} | |
} else { | |
context.getCounter(PreKeywordCounter.no_prekeyword).increment(1); | |
} | |
} | |
} | |
//reducer | |
package com.tistory.devyongsik.hadoop.mapre; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import org.apache.hadoop.io.DoubleWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; | |
/** | |
* @author need4spd, [email protected], 2013. 1. 18. | |
* | |
*/ | |
public class KeywordReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { | |
private MultipleOutputs<Text, DoubleWritable> mos; | |
private Text outputKey = new Text(); | |
private DoubleWritable outputValue = new DoubleWritable(); | |
@Override | |
public void setup(Context context) throws IOException, InterruptedException { | |
mos = new MultipleOutputs<Text, DoubleWritable>(context); | |
} | |
/** | |
* input : | |
* KC,키워드, 키워드별 검색횟수 (KC, 로그날짜, 키워드 : 횟수) | |
* KR,키워드, 키워드별 평균응답시간 (KR, 로그날짜, 키워드 : 횟수, 응답시간합, 평균응답시간) | |
* KP,키워드, 키워드별 재검색어 목록 (KP, 로그날짜, 키워드, 재검색어 : 횟수) | |
* | |
* KC,20130106,hum : 1 | |
* KR,20130106,hum : 58 | |
* KP,20130106,hum,hum1 : 1; | |
* KP,20130106,hum,hum2 : 1; | |
*/ | |
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { | |
String[] splitKey = key.toString().split(","); | |
String gubun = splitKey[0]; | |
outputKey.set(splitKey[1]+","+splitKey[2]); | |
System.out.println("splitKey : " + Arrays.toString(splitKey)); | |
//키워드별 검색횟수 | |
if("KC".equals(gubun)) { | |
System.out.println("gubun KC!!"); | |
double sumRequestCount = 0; | |
for(DoubleWritable value : values) { | |
sumRequestCount += value.get(); | |
} | |
outputValue.set(sumRequestCount); | |
mos.write("requestcountbykeyword", outputKey, outputValue); | |
//키워드별 평균응답시간 | |
} else if("KR".equals(gubun)) { | |
double keyCount = 0; | |
System.out.println("gubun KR!!"); | |
double sumResponseTime = 0; | |
for(DoubleWritable value : values) { | |
keyCount++; | |
sumResponseTime += value.get(); | |
} | |
System.out.println("KR keyCounter : " + keyCount); | |
double averageResponseTime = sumResponseTime / keyCount; | |
outputValue.set(averageResponseTime); | |
mos.write("averageresponsetimebykeyword", outputKey, outputValue); | |
} else if("KP".equals(gubun)) { | |
System.out.println("gubun KP!!"); | |
if(splitKey[2].equals(splitKey[3])) { | |
return; | |
} | |
double sumPreKeywordCount = 0; | |
for(DoubleWritable value : values) { | |
sumPreKeywordCount += value.get(); | |
} | |
outputValue.set(sumPreKeywordCount); | |
outputKey.clear(); | |
outputKey.set(splitKey[1]+","+splitKey[2]+","+splitKey[3]); | |
System.out.println("gubun KP!! Write!!!"); | |
mos.write("prekeywordcountbykeyword", outputKey, outputValue); | |
} | |
} | |
@Override | |
public void cleanup(Context context) throws IOException, InterruptedException { | |
mos.close(); | |
} | |
} | |
//driver | |
package com.tistory.devyongsik.hadoop.mapre; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.DoubleWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
/** | |
* @author need4spd, [email protected], 2013. 1. 21. | |
* | |
*/ | |
public class KeywordStatisticWithMultipleOutputs extends Configured implements Tool { | |
public int run(String[] args) throws Exception { | |
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); | |
if(otherArgs.length != 2) { | |
System.err.println("Usage : <in> <out>"); | |
System.exit(2); | |
} | |
Job job = new Job(getConf(), "KeywordStatisticWithMultipleOutputs"); | |
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); | |
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); | |
job.setJarByClass(KeywordStatisticWithMultipleOutputs.class); | |
job.setMapperClass(KeywordMapper.class); | |
job.setReducerClass(KeywordReducer.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(DoubleWritable.class); | |
MultipleOutputs.addNamedOutput(job, "requestcountbykeyword", TextOutputFormat.class, Text.class, DoubleWritable.class); | |
MultipleOutputs.addNamedOutput(job, "averageresponsetimebykeyword", TextOutputFormat.class, Text.class, DoubleWritable.class); | |
MultipleOutputs.addNamedOutput(job, "prekeywordcountbykeyword", TextOutputFormat.class, Text.class, DoubleWritable.class); | |
job.waitForCompletion(true); | |
return 0; | |
} | |
public static void main(String[] args) throws Exception { | |
int res = ToolRunner.run(new Configuration(), new KeywordStatisticWithMultipleOutputs(), args); | |
System.out.println("## RESULT : " + res); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment