Skip to content

Instantly share code, notes, and snippets.

@paulschwarz
Created July 13, 2019 20:53
Show Gist options
  • Save paulschwarz/934a68c1597fcfb14c66bb26b23b51ac to your computer and use it in GitHub Desktop.
Save paulschwarz/934a68c1597fcfb14c66bb26b23b51ac to your computer and use it in GitHub Desktop.
package me.paulschwarz.processor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.Supplier;
import me.paulschwarz.Source.Job;
/**
* See https://www.baeldung.com/java-blocking-queue
*/
public class BlockingQueueProcessor implements Processor {
private final Consumer<Job> work;
private final ThreadPoolExecutor workersExecutor;
private final BlockingQueue<Job> jobsQueue;
public BlockingQueueProcessor(Consumer<Job> work) {
this(work, Runtime.getRuntime().availableProcessors());
}
public BlockingQueueProcessor(Consumer<Job> work, int poolSize) {
this.work = work;
jobsQueue = new LinkedBlockingQueue<>(19); // TODO: no magic numbers
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("worker-%d/" + poolSize)
.build();
this.workersExecutor = (ThreadPoolExecutor) Executors
.newFixedThreadPool(poolSize, namedThreadFactory);
for (int i = 0; i < poolSize; i++) {
startWorker();
}
}
@Override
public void run(Supplier<List<Job>> jobsSupplier) {
List<Job> batch;
do {
batch = jobsSupplier.get();
batch.forEach(job -> {
try {
jobsQueue.put(job);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(String.format("added to queue. size: %d", jobsQueue.size()));
} while (batch.size() > 0);
// Send N poison pills to workers, where N = thread pool size
for (int i = 0; i < workersExecutor.getMaximumPoolSize(); i++) {
try {
jobsQueue.put(new Job(-1, null));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
workersExecutor.shutdown();
}
private void startWorker() {
workersExecutor.submit(() -> {
try {
while (true) {
Job job = jobsQueue.take();
if (job.isPoisonPill()) {
System.out.println(String.format("[%s] shutting down", Thread.currentThread().getName()));
return;
}
this.work.accept(job);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment