Created
March 28, 2017 13:22
-
-
Save developer-sdk/4f33cd5444989d63ef955e33e661d5be to your computer and use it in GitHub Desktop.
hive UDAF example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; | |
| import org.apache.hadoop.hive.ql.metadata.HiveException; | |
| import org.apache.hadoop.hive.ql.parse.SemanticException; | |
| import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; | |
| import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; | |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; | |
| import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; | |
| import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; | |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; | |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; | |
| import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; | |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; | |
| public class SumInt extends AbstractGenericUDAFResolver { | |
| @Override | |
| public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException { | |
| // 파라미터는 하나만 받음 | |
| if (info.length != 1) { | |
| throw new UDFArgumentTypeException(info.length - 1, "Exactly one argument is expected."); | |
| } | |
| // 파라미터의 카테고리가 프리미티브 타입이 아니면 예외 처리 | |
| if (info[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { | |
| throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1."); | |
| } | |
| // 전달된 파라미터의 타입이 스트링이면 SumStringEvaluator, 아니면 SumIntEvaluator 처리 | |
| if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.STRING) { | |
| return new SumStringEvaluator(); | |
| } else if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.INT) { | |
| return new SumIntEvaluator(); | |
| } else { | |
| throw new UDFArgumentTypeException(0, "Only string, int type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1."); | |
| } | |
| } | |
| /** | |
| * 문자열 int 를 변환하여 sum 하는 Evaluator | |
| * | |
| * @author User | |
| * | |
| */ | |
| public static class SumStringEvaluator extends GenericUDAFEvaluator { | |
| private PrimitiveObjectInspector inputOI; | |
| static class SumAggregationBuffer implements AggregationBuffer { | |
| int sum; | |
| } | |
| @Override | |
| public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { | |
| super.init(m, parameters); | |
| inputOI = (PrimitiveObjectInspector) parameters[0]; | |
| return PrimitiveObjectInspectorFactory.javaIntObjectInspector; | |
| } | |
| @Override | |
| public AggregationBuffer getNewAggregationBuffer() throws HiveException { | |
| SumAggregationBuffer sum = new SumAggregationBuffer(); | |
| reset(sum); | |
| return sum; | |
| } | |
| @Override | |
| public void reset(AggregationBuffer agg) throws HiveException { | |
| ((SumAggregationBuffer) agg).sum = 0; | |
| } | |
| @Override | |
| public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { | |
| // 전달받은 파라미터가 없거나, null 일경우 처리 | |
| if(parameters.length != 0 && inputOI.getPrimitiveJavaObject(parameters[0]) != null) { | |
| ((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(parameters[0]).toString()); | |
| } | |
| } | |
| @Override | |
| public Object terminatePartial(AggregationBuffer agg) throws HiveException { | |
| return ((SumAggregationBuffer) agg).sum; | |
| } | |
| @Override | |
| public void merge(AggregationBuffer agg, Object partial) throws HiveException { | |
| ((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(partial).toString()); | |
| } | |
| @Override | |
| public Object terminate(AggregationBuffer agg) throws HiveException { | |
| return ((SumAggregationBuffer) agg).sum; | |
| } | |
| } | |
| /** | |
| * int 값을 sum 하는 Evaluator | |
| * | |
| * @author User | |
| * | |
| */ | |
| public static class SumIntEvaluator extends GenericUDAFEvaluator { | |
| private IntObjectInspector inputOI; | |
| static class SumAggregationBuffer implements AggregationBuffer { | |
| int sum; | |
| } | |
| @Override | |
| public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { | |
| super.init(m, parameters); | |
| inputOI = (IntObjectInspector) parameters[0]; | |
| return PrimitiveObjectInspectorFactory.javaIntObjectInspector; | |
| } | |
| @Override | |
| public AggregationBuffer getNewAggregationBuffer() throws HiveException { | |
| SumAggregationBuffer sum = new SumAggregationBuffer(); | |
| reset(sum); | |
| return sum; | |
| } | |
| @Override | |
| public void reset(AggregationBuffer agg) throws HiveException { | |
| ((SumAggregationBuffer) agg).sum = 0; | |
| } | |
| @Override | |
| public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { | |
| ((SumAggregationBuffer) agg).sum += inputOI.get(parameters[0]); | |
| } | |
| @Override | |
| public Object terminatePartial(AggregationBuffer agg) throws HiveException { | |
| return ((SumAggregationBuffer) agg).sum; | |
| } | |
| @Override | |
| public void merge(AggregationBuffer agg, Object partial) throws HiveException { | |
| ((SumAggregationBuffer) agg).sum += inputOI.get(partial); | |
| } | |
| @Override | |
| public Object terminate(AggregationBuffer agg) throws HiveException { | |
| return ((SumAggregationBuffer) agg).sum; | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
best custom UDAF demo!