Skip to content

Instantly share code, notes, and snippets.

@amaya382
Last active September 21, 2016 04:38
Show Gist options
  • Save amaya382/9829ae548bc2a367d54415f924d67c84 to your computer and use it in GitHub Desktop.
Save amaya382/9829ae548bc2a367d54415f924d67c84 to your computer and use it in GitHub Desktop.
skeleton for GenericUDAF
package somewhere;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@Description(name = "skeleton",
value = "_FUNC_(something) - Returns something")
public class SkeletonUDAF extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
final ObjectInspector[] OIs = info.getParameterObjectInspectors();
if (OIs.length != 0) {
throw new UDFArgumentLengthException("Specify arguments.");
}
if (/*check type of OIs*/) {
throw new UDFArgumentTypeException(0, "Only type argument is acceptable but "
+ OIs[0].getTypeName() + " was passed as ``");
}
return new SkeletonUDAFEvaluator();
}
private static class SkeletonUDAFEvaluator extends GenericUDAFEvaluator {
// declare local OIs for PARTIAL1 and COMPLETE: OIs for a original data
// declare local OIs for PARTIAL2 and FINAL: OIs for partial aggregations
// private StructObjectInspector structOI;
// private StructField skeletonField;
// private ObjectInspector skeletonOI;
@AggregationType(estimable = true)
static class SkeletonAggregationBuffer extends AbstractAggregationBuffer {
// declare variables for buffering
@Override
public int estimate() {
return 0; // estimating value for size of *this* buffer
}
public void reset() {
// enable this buffer to be reused
}
}
@Override
public ObjectInspector init(Mode mode, ObjectInspector[] OIs) throws HiveException {
super.init(mode, OIs);
if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
// init local OIs by an original data
} else {
// init local OIs by a partial aggregation
// structOI = (StructObjectInspector) OIs[0];
// skeletonField = structOI.getStructFieldRef("skeleton");
// skeletonOI = (ObjectInspector) skeletonField.getFieldObjectInspector();
}
if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {
// List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
// fieldOIs.add(ObjectInspectorFactory...);
// return ObjectInspectorFactory.getStandardStructObjectInspector(
// Arrays.asList("skeleton"), fieldOIs);
return null; // OI of a partial aggregation generated by `#terminatePartial(...)`
} else {
return null; // OI of a final result value, which is scalar, generated by `#terminate(...)`
}
}
@Override
public AbstractAggregationBuffer getNewAggregationBuffer() throws HiveException {
final SkeletonAggregationBuffer myAgg = new SkeletonAggregationBuffer();
reset(myAgg);
return myAgg;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
final SkeletonAggregationBuffer myAgg = (SkeletonAggregationBuffer) agg;
myAgg.reset();
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
final SkeletonAggregationBuffer myAgg = (SkeletonAggregationBuffer) agg;
// update myAgg by parameters
}
@Override
public void merge(AggregationBuffer agg, Object other) throws HiveException {
if (other == null) {
return;
}
final SkeletonAggregationBuffer myAgg = (SkeletonAggregationBuffer) agg;
// Object skeleton = skeletonOI.get(structOI.getStructFieldData(other, skeletonField));
// merge other into myAgg
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
final SkeletonAggregationBuffer myAgg = (SkeletonAggregationBuffer) agg;
// Object[] partialResult = new Object[1];
// partialResult[0] = witableSkeleton;
// return partialResult;
// return writable partial result by myAgg
return null;
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
final SkeletonAggregationBuffer myAgg = (SkeletonAggregationBuffer) agg;
// return writable result by myAgg
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment