Skip to content

Instantly share code, notes, and snippets.

@haoch
Last active January 2, 2016 18:49
Show Gist options
  • Save haoch/8346176 to your computer and use it in GitHub Desktop.
Save haoch/8346176 to your computer and use it in GitHub Desktop.
Pig UDF
public class COUNT extends EvalFunc<Long> implements Algebraic{
public Long exec(Tuple input) throws IOException {return count(input);}
public String getInitial() {return Initial.class.getName();}
public String getIntermed() {return Intermed.class.getName();}
public String getFinal() {return Final.class.getName();}
static public class Initial extends EvalFunc<Tuple> {
public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(count(input));}
}
static public class Intermed extends EvalFunc<Tuple> {
public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));}
}
static public class Final extends EvalFunc<Long> {
public Tuple exec(Tuple input) throws IOException {return sum(input);}
}
static protected Long count(Tuple input) throws ExecException {
Object values = input.get(0);
if (values instanceof DataBag) return ((DataBag)values).size();
else if (values instanceof Map) return new Long(((Map)values).size());
}
static protected Long sum(Tuple input) throws ExecException, NumberFormatException {
DataBag values = (DataBag)input.get(0);
long sum = 0;
for (Iterator (Tuple) it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
sum += (Long)t.get(0);
}
return sum;
}
}
public class IntMax extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> {
…….
/* Accumulator interface */
private Integer intermediateMax = null;
@Override
public void accumulate(Tuple b) throws IOException {
try {
Integer curMax = max(b);
if (curMax == null) {
return;
}
/* if bag is not null, initialize intermediateMax to negative infinity */
if (intermediateMax == null) {
intermediateMax = Integer.MIN_VALUE;
}
intermediateMax = java.lang.Math.max(intermediateMax, curMax);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
int errCode = 2106;
String msg = "Error while computing max in " + this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void cleanup() {
intermediateMax = null;
}
@Override
public Integer getValue() {
return intermediateMax;
}
}
import java.io.IOException;
import java.util.Map;
import org.apache.pig.FilterFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataType;
/**
* Determine whether a bag or map is empty.
*/
public class IsEmpty extends FilterFunc {
@Override
public Boolean exec(Tuple input) throws IOException {
try {
Object values = input.get(0);
if (values instanceof DataBag)
return ((DataBag)values).size() == 0;
else if (values instanceof Map)
return ((Map)values).size() == 0;
else {
int errCode = 2102;
String msg = "Cannot test a " +
DataType.findTypeName(values) + " for emptiness.";
throw new ExecException(msg, errCode, PigException.BUG);
}
} catch (ExecException ee) {
throw ee;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment