Last active
March 4, 2021 18:15
-
-
Save Bill/3e0587f43171d2d72718a945deb79fb3 to your computer and use it in GitHub Desktop.
An experiment with a ThreadPoolExecutor and various sync/async workQueues for https://cwiki.apache.org/confluence/display/GEODE/Thread+Pools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.SynchronousQueue; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
class Scratch { | |
public static void main(String[] args) throws InterruptedException, ExecutionException { | |
for( final boolean allowCoreThreadTimeout : Arrays.asList(false, true)) { | |
for( final int queueLimit : Arrays.asList(0, 5)) { | |
for( final int corePoolSize : Arrays.asList(0, 5)) { | |
runExperiment(corePoolSize, 5, queueLimit, allowCoreThreadTimeout, 1); | |
} | |
} | |
} | |
} | |
private static void runExperiment(final int corePoolSize, final int maxPoolSize, | |
final int queueLimit, final boolean allowCoreThreadTimeout, | |
final long keepAliveSeconds) | |
throws InterruptedException, ExecutionException { | |
final BlockingQueue<Runnable> | |
workQueue = | |
queueLimit == 0 ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(queueLimit); | |
final ThreadFactory threadFactory = Executors.defaultThreadFactory(); | |
final ThreadPoolExecutor | |
tpe = | |
new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, | |
workQueue, threadFactory); | |
tpe.allowCoreThreadTimeOut(allowCoreThreadTimeout); | |
System.out.println("---------- Experiment Starting -------------"); | |
System.out.println(" corePoolSize: " + corePoolSize); | |
System.out.println(" maxPoolSize: " + maxPoolSize); | |
System.out.println(" BlockingQueue limit: " + queueLimit); | |
System.out.println("allowCoreThreadTimeout: " + allowCoreThreadTimeout); | |
final List<Future<?>> futures = new ArrayList<>(); | |
for (int i = 1; i <= 3; i++) { | |
futures.add(tpe.submit(new SampleWorker("task " + i))); | |
} | |
System.out.println(" concurrency: " + tpe.getActiveCount()); | |
for (Future<?> future : futures) { | |
future.get(); | |
} | |
tpe.awaitTermination(2, TimeUnit.SECONDS); | |
System.out.println(" Leftover threads: " + tpe.getPoolSize()); | |
tpe.shutdown(); | |
System.out.println("---------- Experiment Complete -------------"); | |
} | |
static class SampleWorker implements Runnable { | |
private String workerName; | |
SampleWorker(String tName) { | |
workerName = tName; | |
} | |
@Override | |
public void run() { | |
try { | |
for (int i = 1; i <= 3; i++) { | |
Thread.sleep(500); | |
System.out.println(this.workerName + " step " + i); | |
} | |
System.out.println(this.workerName + " finished"); | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample output follows. Notice how, when core pool size is zero and we use a non-synchronous queue (
BlockingQueue
limit >0
) we see the tasks executed sequentially.But this is easily overcome by setting the
corePoolSize
>0
(we use5
in these tests.) TheThreadPoolExecutor
immediately assigns tasks to threads until the core pool size is hit. This immediate dispatch takes place regardless of the type (syncronous vs non-syncronous) ofBlockingQueue
specified.Also, the tests show it's also possible to drain the core pool when demand is dropped, by setting
allowCoreThreadTimeout
=true