-
-
Save hsanchez/085cb70905d080441be4a5d82d81cc60 to your computer and use it in GitHub Desktop.
ExecutorCompletionService example to asynchronously process elements from a queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.Queue; | |
import java.util.Random; | |
import java.util.concurrent.*; | |
public class QueueConsumer { | |
private static final Logger LOGGER = LoggerFactory.getLogger(QueueConsumer.class); | |
private Queue<Integer> queue; | |
private CompletionService<Result> completionService; | |
public QueueConsumer(Queue<Integer> queue) { | |
this.queue = queue; | |
ExecutorService executorService = Executors.newFixedThreadPool(5); | |
this.completionService = new ExecutorCompletionService(executorService); | |
} | |
public void consumeBatch() { | |
int workerCount = submitTasks(); | |
while (workerCount > 0) { | |
try { | |
Result result = completionService.take().get(); | |
System.out.printf(result.getResultOfLengthyTask()); | |
workerCount--; | |
} catch (InterruptedException e) { | |
// not sure what to do here but to log it... | |
LOGGER.error("Our worker got interrupted...{}", e); | |
} catch (ExecutionException e) { | |
if(e.getCause() instanceof ManageableException) { | |
// we can recover from this | |
// but we still log it | |
LOGGER.warn("Something didn't go quite well...{}", e); | |
} else { | |
// not much we can do | |
LOGGER.error("Something went really wrong...{}", e); | |
} | |
} | |
} | |
} | |
private int submitTasks() { | |
int workerCount = 0; | |
Integer next = queue.peek(); | |
while(next != null) { | |
try { | |
completionService.submit(new Worker(next)); | |
queue.poll(); | |
next = queue.peek(); | |
workerCount++; | |
} catch (RejectedExecutionException e) { | |
// executor can't accept anymore task so we stop submitting | |
LOGGER.warn("Can't submit anymore tasks...{}", e); | |
return workerCount; | |
} | |
} | |
return workerCount; | |
} | |
class ManageableException extends RuntimeException{} | |
class Result { | |
private String resultOfLengthyTask; | |
Result(String resultOfLengthyTask) { | |
this.resultOfLengthyTask = resultOfLengthyTask; | |
} | |
public String getResultOfLengthyTask() { | |
return resultOfLengthyTask; | |
} | |
} | |
class Worker implements Callable<Result> { | |
private Integer importantIntegerForOurLenghtyTask; | |
public Worker(Integer importantIntegerForOurLenghtyTask) { | |
this.importantIntegerForOurLenghtyTask = importantIntegerForOurLenghtyTask; | |
} | |
@Override | |
public Result call() throws Exception { | |
// here we do the stuff that takes a while and return the result | |
Thread.sleep(new Random().nextInt(10)); | |
return new Result("woooow " + importantIntegerForOurLenghtyTask); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment