Last active
April 12, 2021 21:14
-
-
Save isopropylcyanide/a2abee485c4d9b1f75fc4d9c235a8be9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A concurrent work executor that blocks for the final result after individual execution results | |
* are obtained. The results are fed into the queue represented by completion service as they are | |
* getting completed. Note that this behavior can be changed by using a single threaded executor | |
* <p> | |
* If execution of any individual work results in an exception, an exception is raised | |
*/ | |
static class OutOfOrderConcurrentWorkExecutor implements ConcurrentWorkExecutor { | |
@Override | |
public <T, U, V> V splitJoin(int size, Iterable<U> iterable, Function<U, T> mapper, Collector<T, ?, V> collector) throws ExecutionException, InterruptedException { | |
ThreadFactory threadFactory = Executors.defaultThreadFactory(); | |
ExecutorService executor = Executors.newCachedThreadPool(threadFactory); //create a cached thread pool ideal for short lived tasks | |
AtomicInteger submitTaskCount = new AtomicInteger(0); //to keep track of submitted tasks | |
CompletionService<T> service = new ExecutorCompletionService<>(executor); | |
try { | |
iterable.forEach(u -> { //note that iterable.size() must not be really large as cached thread pool is unbounded | |
//we typically restrict this size bound when we accept the request itself. | |
service.submit(() -> { | |
log.info("Submitting task f({}) on {}", u, Thread.currentThread().getName()); | |
return mapper.apply(u); | |
}); //user specified mapper is invoked here to create a task and submitted | |
submitTaskCount.incrementAndGet(); | |
}); | |
} finally { | |
executor.shutdown(); // stop accepting any more tasks except for the ones that are submitted | |
} | |
List<T> results = new ArrayList<>(); | |
for (int i = 0; i < Math.min(submitTaskCount.get(), size); i++) { //if we didn't do min(size, submitted) & size > submitted, it would lead to an | |
//infinite loop as take() blocks on an empty queue | |
T t; | |
try { | |
t = service.take().get(); | |
log.info("Finished {}/{}: {} on {}", i, submitTaskCount.get(), t, Thread.currentThread().getName()); | |
} catch (ExecutionException ex) { | |
log.error("Received error during computation for result in Thread [{}] {}", Thread.currentThread().getId(), ex.getMessage()); | |
throw ex; //letting the application handle it | |
} | |
if (t != null) { | |
results.add(t); //collecting the out of order result | |
} | |
} | |
return results.stream().collect(collector); //applying user specified collector to the collection of intermediate results | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment