Skip to content

Instantly share code, notes, and snippets.

@friso
Created February 6, 2012 21:51
Show Gist options
  • Select an option

  • Save friso/1755159 to your computer and use it in GitHub Desktop.

Select an option

Save friso/1755159 to your computer and use it in GitHub Desktop.
Iterate with flags
public class IterateWithFlagsJob {
public static int run(String input, String output, int maxIterations) {
boolean done = false;
int iterationCount = 0;
while (!done) {
Scheme sourceScheme = new TextDelimited(new Fields("partition", "source", "list", "flags"), "\t");
Tap source = new Hfs(sourceScheme, currentIterationInputPath);
Scheme sinkScheme = new TextDelimited(new Fields("partition", "source", "list", "flags"), "\t");
Tap sink = new Hfs(sinkScheme, currentIterationOutputPath, SinkMode.REPLACE);
//SNIPPED SOME BOILERPLATE...
Pipe iteration = new Pipe("iteration");
//For each input record, create a record per node (step 3)
iteration = new Each(iteration, new FanOut());
//GROUP BY node ORDER BY flag, partition DESCENDING
iteration = new GroupBy(
iteration,
new Fields("node"),
new Fields("flag", "partition"), //it works because of the flag!
true);
//For every group, create records with the largest partition
iteration = new Every(
iteration,
new MaxPartitionToTuples(),
Fields.RESULTS);
//GROUP BY source node ORDER BY partition DESCENDING (step 4)
iteration = new GroupBy(
iteration,
new Fields("source"),
new Fields("flag", "partition"), //again, the flag!
true);
//For every group, re-create the adjacency list
//with the largest partition
//This step also updates the counter
iteration = new Every(
iteration,
new MaxPartitionToAdjacencyList(),
Fields.RESULTS);
//SNIPPED SOME BOILERPLATE...
flow.complete();
//Grab counter value?
long updatedPartitions = flow.getFlowStats().getCounterValue(
MaxPartitionToAdjacencyList.COUNTER_GROUP,
MaxPartitionToAdjacencyList.PARTITIONS_UPDATES_COUNTER_NAME);
//Are we done?
done = updatedPartitions == 0 || iterationCount == maxIterations - 1;
iterationCount++;
}
return 0;
}
private static class MaxPartitionToAdjacencyListContext {
int source;
int partition = -1;
List<Integer> targets;
List<Byte> flags;
public MaxPartitionToAdjacencyListContext() {
this.targets = new ArrayList<Integer>();
this.flags = new ArrayList<Byte>();
}
}
@SuppressWarnings("serial")
private static class MaxPartitionToAdjacencyList
extends BaseOperation<MaxPartitionToAdjacencyListContext>
implements Aggregator<MaxPartitionToAdjacencyListContext> {
public static final String PARTITIONS_UPDATES_COUNTER_NAME = "Partitions updates";
public static final String COUNTER_GROUP = "graphs";
public MaxPartitionToAdjacencyList() {
super(new Fields("partition", "source", "list", "flags"));
}
@Override
public void start(
FlowProcess flowProcess,
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) {
MaxPartitionToAdjacencyListContext context = new MaxPartitionToAdjacencyListContext();
context.source = aggregatorCall.getGroup().getInteger("source");
aggregatorCall.setContext(context);
}
@Override
public void aggregate(
FlowProcess flowProcess,
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) {
MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext();
TupleEntry arguments = aggregatorCall.getArguments();
int node = arguments.getInteger("node");
int partition = arguments.getInteger("partition");
boolean flag = arguments.getBoolean("flag");
if (context.partition == -1) {
context.partition = partition;
} else {
if (flag && context.partition > partition) {
flowProcess.increment(COUNTER_GROUP, PARTITIONS_UPDATES_COUNTER_NAME, 1);
}
}
if (node != context.source) {
context.targets.add(node);
//here, convert boolean flags back to 1's or 0's
context.flags.add((byte) (flag ? 1 : 0));
}
}
@Override
public void complete(
FlowProcess flowProcess,
AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) {
MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext();
Tuple result = new Tuple(
context.partition,
context.source,
StringUtils.joinObjects(",", context.targets),
StringUtils.joinObjects(",", context.flags)); //Here's the flags
aggregatorCall.getOutputCollector().add(result);
}
}
@SuppressWarnings({ "serial", "rawtypes", "unchecked" })
private static class MaxPartitionToTuples
extends BaseOperation
implements Buffer {
public MaxPartitionToTuples() {
super(new Fields("partition", "node", "source", "flag"));
}
@Override
public void operate(
FlowProcess flowProcess,
BufferCall bufferCall) {
Iterator<TupleEntry> itr = bufferCall.getArgumentsIterator();
int maxPartition;
TupleEntry entry = itr.next();
maxPartition = entry.getInteger("partition");
emitTuple(bufferCall, maxPartition, entry);
while (itr.hasNext()) {
entry = itr.next();
emitTuple(bufferCall, maxPartition, entry);
}
}
private void emitTuple(
BufferCall bufferCall,
int maxPartition,
TupleEntry entry) {
Tuple result = new Tuple(
maxPartition,
entry.getInteger("node"),
entry.getInteger("source"),
entry.getBoolean("flag"));
bufferCall.getOutputCollector().add(result);
}
}
@SuppressWarnings({ "serial", "rawtypes" })
private static class FanOut
extends BaseOperation
implements Function {
public FanOut() {
super(new Fields("partition", "node", "source", "flag"));
}
@Override
public void operate(
FlowProcess flowProcess,
FunctionCall functionCall) {
TupleEntry args = functionCall.getArguments();
int partition = args.getInteger("partition");
int source = args.getInteger("source");
Tuple result = new Tuple(partition, source, source, true);
functionCall.getOutputCollector().add(result);
String[] nodeList = args.getString("list").split(",");
String[] flagList = args.getString("flags").split(",");
for (int c = 0; c < nodeList.length; c++) {
//During fanout, we convert the flags to booleans
result = new Tuple(
partition,
Integer.parseInt(nodeList[c]),
source,
Integer.parseInt(flagList[c]) != 0);
functionCall.getOutputCollector().add(result);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment