Last active
January 2, 2016 18:49
-
-
Save haoch/8346176 to your computer and use it in GitHub Desktop.
Pig UDF
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class COUNT extends EvalFunc<Long> implements Algebraic{ | |
public Long exec(Tuple input) throws IOException {return count(input);} | |
public String getInitial() {return Initial.class.getName();} | |
public String getIntermed() {return Intermed.class.getName();} | |
public String getFinal() {return Final.class.getName();} | |
static public class Initial extends EvalFunc<Tuple> { | |
public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(count(input));} | |
} | |
static public class Intermed extends EvalFunc<Tuple> { | |
public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));} | |
} | |
static public class Final extends EvalFunc<Long> { | |
public Tuple exec(Tuple input) throws IOException {return sum(input);} | |
} | |
static protected Long count(Tuple input) throws ExecException { | |
Object values = input.get(0); | |
if (values instanceof DataBag) return ((DataBag)values).size(); | |
else if (values instanceof Map) return new Long(((Map)values).size()); | |
} | |
static protected Long sum(Tuple input) throws ExecException, NumberFormatException { | |
DataBag values = (DataBag)input.get(0); | |
long sum = 0; | |
for (Iterator (Tuple) it = values.iterator(); it.hasNext();) { | |
Tuple t = it.next(); | |
sum += (Long)t.get(0); | |
} | |
return sum; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class IntMax extends EvalFunc<Integer> implements Algebraic, Accumulator<Integer> { | |
……. | |
/* Accumulator interface */ | |
private Integer intermediateMax = null; | |
@Override | |
public void accumulate(Tuple b) throws IOException { | |
try { | |
Integer curMax = max(b); | |
if (curMax == null) { | |
return; | |
} | |
/* if bag is not null, initialize intermediateMax to negative infinity */ | |
if (intermediateMax == null) { | |
intermediateMax = Integer.MIN_VALUE; | |
} | |
intermediateMax = java.lang.Math.max(intermediateMax, curMax); | |
} catch (ExecException ee) { | |
throw ee; | |
} catch (Exception e) { | |
int errCode = 2106; | |
String msg = "Error while computing max in " + this.getClass().getSimpleName(); | |
throw new ExecException(msg, errCode, PigException.BUG, e); | |
} | |
} | |
@Override | |
public void cleanup() { | |
intermediateMax = null; | |
} | |
@Override | |
public Integer getValue() { | |
return intermediateMax; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.Map; | |
import org.apache.pig.FilterFunc; | |
import org.apache.pig.PigException; | |
import org.apache.pig.backend.executionengine.ExecException; | |
import org.apache.pig.data.DataBag; | |
import org.apache.pig.data.Tuple; | |
import org.apache.pig.data.DataType; | |
/** | |
* Determine whether a bag or map is empty. | |
*/ | |
public class IsEmpty extends FilterFunc { | |
@Override | |
public Boolean exec(Tuple input) throws IOException { | |
try { | |
Object values = input.get(0); | |
if (values instanceof DataBag) | |
return ((DataBag)values).size() == 0; | |
else if (values instanceof Map) | |
return ((Map)values).size() == 0; | |
else { | |
int errCode = 2102; | |
String msg = "Cannot test a " + | |
DataType.findTypeName(values) + " for emptiness."; | |
throw new ExecException(msg, errCode, PigException.BUG); | |
} | |
} catch (ExecException ee) { | |
throw ee; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment