Last active
January 6, 2016 14:10
-
-
Save cykl/7f71c1a3dff3f881f3ba to your computer and use it in GitHub Desktop.
[Crunch] POC: Avoid the MateralizableIterable dance by storing all the PipelineResults in the pipeline instance.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fr.mediametrie.internet.weball.crunch; | |
import java.util.List; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import org.apache.crunch.PipelineResult; | |
import org.apache.crunch.impl.mr.MRPipeline; | |
import org.apache.crunch.impl.mr.MRPipelineExecution; | |
import org.apache.hadoop.conf.Configuration; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.util.concurrent.MoreExecutors; | |
/** | |
* A {@link MRPipeline} remembering all the {@link PipelineResult}s it has created. | |
* | |
* <p> | |
* Crunch's API makes it difficult to get access to the Hadoop counters when things like {@literal materialize()} are | |
* used: | |
* <ul> | |
* <li>If we stick to the public API, it is not possible to get access to the | |
* PipelineResult possibly created by a call to materialize. One has to cast | |
* the iterable to MaterializeIterable</li> | |
* <li>Code dealing with the iterable most likely do not care about counters | |
* at all, but must extract the PipelineResult and pass it to someone else to | |
* not loose it.</li> | |
* </ul> | |
*/ | |
public class HyperthymesticMRPipeline extends MRPipeline { | |
private final ConcurrentLinkedQueue<PipelineResult> allPipelineResults = new ConcurrentLinkedQueue<>(); | |
public HyperthymesticMRPipeline(Class<?> jarClass) { | |
super(jarClass); | |
} | |
public HyperthymesticMRPipeline(Class<?> jarClass, String name) { | |
super(jarClass, name); | |
} | |
public HyperthymesticMRPipeline(Class<?> jarClass, Configuration conf) { | |
super(jarClass, conf); | |
} | |
public HyperthymesticMRPipeline(Class<?> jarClass, String name, Configuration conf) { | |
super(jarClass, name, conf); | |
} | |
/** | |
* Gets all the {@link PipelineResult} created by the pipeline since its creation | |
*/ | |
public List<PipelineResult> getAllPipelineResults() { | |
return ImmutableList.copyOf(allPipelineResults); | |
} | |
@Override | |
public MRPipelineExecution runAsync() { | |
MRPipelineExecution mrPipelineExecution = super.runAsync(); | |
mrPipelineExecution.addListener(() -> { | |
PipelineResult pipelineResult = mrPipelineExecution.getResult(); | |
allPipelineResults.offer(pipelineResult); | |
}, MoreExecutors.sameThreadExecutor()); | |
return mrPipelineExecution; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment