Created
February 6, 2012 21:51
-
-
Save friso/1755159 to your computer and use it in GitHub Desktop.
Iterate with flags
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class IterateWithFlagsJob { | |
| public static int run(String input, String output, int maxIterations) { | |
| boolean done = false; | |
| int iterationCount = 0; | |
| while (!done) { | |
| Scheme sourceScheme = new TextDelimited(new Fields("partition", "source", "list", "flags"), "\t"); | |
| Tap source = new Hfs(sourceScheme, currentIterationInputPath); | |
| Scheme sinkScheme = new TextDelimited(new Fields("partition", "source", "list", "flags"), "\t"); | |
| Tap sink = new Hfs(sinkScheme, currentIterationOutputPath, SinkMode.REPLACE); | |
| //SNIPPED SOME BOILERPLATE... | |
| Pipe iteration = new Pipe("iteration"); | |
| //For each input record, create a record per node (step 3) | |
| iteration = new Each(iteration, new FanOut()); | |
| //GROUP BY node ORDER BY flag, partition DESCENDING | |
| iteration = new GroupBy( | |
| iteration, | |
| new Fields("node"), | |
| new Fields("flag", "partition"), //it works because of the flag! | |
| true); | |
| //For every group, create records with the largest partition | |
| iteration = new Every( | |
| iteration, | |
| new MaxPartitionToTuples(), | |
| Fields.RESULTS); | |
| //GROUP BY source node ORDER BY partition DESCENDING (step 4) | |
| iteration = new GroupBy( | |
| iteration, | |
| new Fields("source"), | |
| new Fields("flag", "partition"), //again, the flag! | |
| true); | |
| //For every group, re-create the adjacency list | |
| //with the largest partition | |
| //This step also updates the counter | |
| iteration = new Every( | |
| iteration, | |
| new MaxPartitionToAdjacencyList(), | |
| Fields.RESULTS); | |
| //SNIPPED SOME BOILERPLATE... | |
| flow.complete(); | |
| //Grab counter value? | |
| long updatedPartitions = flow.getFlowStats().getCounterValue( | |
| MaxPartitionToAdjacencyList.COUNTER_GROUP, | |
| MaxPartitionToAdjacencyList.PARTITIONS_UPDATES_COUNTER_NAME); | |
| //Are we done? | |
| done = updatedPartitions == 0 || iterationCount == maxIterations - 1; | |
| iterationCount++; | |
| } | |
| return 0; | |
| } | |
| private static class MaxPartitionToAdjacencyListContext { | |
| int source; | |
| int partition = -1; | |
| List<Integer> targets; | |
| List<Byte> flags; | |
| public MaxPartitionToAdjacencyListContext() { | |
| this.targets = new ArrayList<Integer>(); | |
| this.flags = new ArrayList<Byte>(); | |
| } | |
| } | |
| @SuppressWarnings("serial") | |
| private static class MaxPartitionToAdjacencyList | |
| extends BaseOperation<MaxPartitionToAdjacencyListContext> | |
| implements Aggregator<MaxPartitionToAdjacencyListContext> { | |
| public static final String PARTITIONS_UPDATES_COUNTER_NAME = "Partitions updates"; | |
| public static final String COUNTER_GROUP = "graphs"; | |
| public MaxPartitionToAdjacencyList() { | |
| super(new Fields("partition", "source", "list", "flags")); | |
| } | |
| @Override | |
| public void start( | |
| FlowProcess flowProcess, | |
| AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) { | |
| MaxPartitionToAdjacencyListContext context = new MaxPartitionToAdjacencyListContext(); | |
| context.source = aggregatorCall.getGroup().getInteger("source"); | |
| aggregatorCall.setContext(context); | |
| } | |
| @Override | |
| public void aggregate( | |
| FlowProcess flowProcess, | |
| AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) { | |
| MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext(); | |
| TupleEntry arguments = aggregatorCall.getArguments(); | |
| int node = arguments.getInteger("node"); | |
| int partition = arguments.getInteger("partition"); | |
| boolean flag = arguments.getBoolean("flag"); | |
| if (context.partition == -1) { | |
| context.partition = partition; | |
| } else { | |
| if (flag && context.partition > partition) { | |
| flowProcess.increment(COUNTER_GROUP, PARTITIONS_UPDATES_COUNTER_NAME, 1); | |
| } | |
| } | |
| if (node != context.source) { | |
| context.targets.add(node); | |
| //here, convert boolean flags back to 1's or 0's | |
| context.flags.add((byte) (flag ? 1 : 0)); | |
| } | |
| } | |
| @Override | |
| public void complete( | |
| FlowProcess flowProcess, | |
| AggregatorCall<MaxPartitionToAdjacencyListContext> aggregatorCall) { | |
| MaxPartitionToAdjacencyListContext context = aggregatorCall.getContext(); | |
| Tuple result = new Tuple( | |
| context.partition, | |
| context.source, | |
| StringUtils.joinObjects(",", context.targets), | |
| StringUtils.joinObjects(",", context.flags)); //Here's the flags | |
| aggregatorCall.getOutputCollector().add(result); | |
| } | |
| } | |
| @SuppressWarnings({ "serial", "rawtypes", "unchecked" }) | |
| private static class MaxPartitionToTuples | |
| extends BaseOperation | |
| implements Buffer { | |
| public MaxPartitionToTuples() { | |
| super(new Fields("partition", "node", "source", "flag")); | |
| } | |
| @Override | |
| public void operate( | |
| FlowProcess flowProcess, | |
| BufferCall bufferCall) { | |
| Iterator<TupleEntry> itr = bufferCall.getArgumentsIterator(); | |
| int maxPartition; | |
| TupleEntry entry = itr.next(); | |
| maxPartition = entry.getInteger("partition"); | |
| emitTuple(bufferCall, maxPartition, entry); | |
| while (itr.hasNext()) { | |
| entry = itr.next(); | |
| emitTuple(bufferCall, maxPartition, entry); | |
| } | |
| } | |
| private void emitTuple( | |
| BufferCall bufferCall, | |
| int maxPartition, | |
| TupleEntry entry) { | |
| Tuple result = new Tuple( | |
| maxPartition, | |
| entry.getInteger("node"), | |
| entry.getInteger("source"), | |
| entry.getBoolean("flag")); | |
| bufferCall.getOutputCollector().add(result); | |
| } | |
| } | |
| @SuppressWarnings({ "serial", "rawtypes" }) | |
| private static class FanOut | |
| extends BaseOperation | |
| implements Function { | |
| public FanOut() { | |
| super(new Fields("partition", "node", "source", "flag")); | |
| } | |
| @Override | |
| public void operate( | |
| FlowProcess flowProcess, | |
| FunctionCall functionCall) { | |
| TupleEntry args = functionCall.getArguments(); | |
| int partition = args.getInteger("partition"); | |
| int source = args.getInteger("source"); | |
| Tuple result = new Tuple(partition, source, source, true); | |
| functionCall.getOutputCollector().add(result); | |
| String[] nodeList = args.getString("list").split(","); | |
| String[] flagList = args.getString("flags").split(","); | |
| for (int c = 0; c < nodeList.length; c++) { | |
| //During fanout, we convert the flags to booleans | |
| result = new Tuple( | |
| partition, | |
| Integer.parseInt(nodeList[c]), | |
| source, | |
| Integer.parseInt(flagList[c]) != 0); | |
| functionCall.getOutputCollector().add(result); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment