Skip to content

Instantly share code, notes, and snippets.

@developer-sdk
Created March 28, 2017 13:22
Show Gist options
  • Select an option

  • Save developer-sdk/4f33cd5444989d63ef955e33e661d5be to your computer and use it in GitHub Desktop.

Select an option

Save developer-sdk/4f33cd5444989d63ef955e33e661d5be to your computer and use it in GitHub Desktop.
hive UDAF example
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
public class SumInt extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
// 파라미터는 하나만 받음
if (info.length != 1) {
throw new UDFArgumentTypeException(info.length - 1, "Exactly one argument is expected.");
}
// 파라미터의 카테고리가 프리미티브 타입이 아니면 예외 처리
if (info[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
}
// 전달된 파라미터의 타입이 스트링이면 SumStringEvaluator, 아니면 SumIntEvaluator 처리
if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.STRING) {
return new SumStringEvaluator();
} else if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.INT) {
return new SumIntEvaluator();
} else {
throw new UDFArgumentTypeException(0, "Only string, int type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
}
}
/**
* 문자열 int 를 변환하여 sum 하는 Evaluator
*
* @author User
*
*/
public static class SumStringEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector inputOI;
static class SumAggregationBuffer implements AggregationBuffer {
int sum;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
inputOI = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumAggregationBuffer sum = new SumAggregationBuffer();
reset(sum);
return sum;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggregationBuffer) agg).sum = 0;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
// 전달받은 파라미터가 없거나, null 일경우 처리
if(parameters.length != 0 && inputOI.getPrimitiveJavaObject(parameters[0]) != null) {
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(parameters[0]).toString());
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(partial).toString());
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
}
/**
* int 값을 sum 하는 Evaluator
*
* @author User
*
*/
public static class SumIntEvaluator extends GenericUDAFEvaluator {
private IntObjectInspector inputOI;
static class SumAggregationBuffer implements AggregationBuffer {
int sum;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
inputOI = (IntObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumAggregationBuffer sum = new SumAggregationBuffer();
reset(sum);
return sum;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggregationBuffer) agg).sum = 0;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
((SumAggregationBuffer) agg).sum += inputOI.get(parameters[0]);
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
((SumAggregationBuffer) agg).sum += inputOI.get(partial);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
}
}
@eigen2017
Copy link
Copy Markdown

best custom UDAF demo!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment