Skip to content

Instantly share code, notes, and snippets.

@tadas-subonis
Created April 23, 2013 22:08
Show Gist options
  • Save tadas-subonis/5447832 to your computer and use it in GitHub Desktop.
Save tadas-subonis/5447832 to your computer and use it in GitHub Desktop.
import java.util.concurrent.*;
import org.testng.annotations.*;
import static org.testng.Assert.*;
public class CoreThreadTimeOut {
final int threadPoolSize = 10;
final int timeoutMillis = 30;
private ThreadPoolExecutor threadPoolExecutor;
@BeforeMethod
void setUp() {
threadPoolExecutor = prepareThreadPoolExecutor();
}
@AfterMethod
void tearDown() throws InterruptedException {
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
Thread.sleep(20);
}
/*
* Clean up our threadpool
*/
assertTrue(threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS));
threadPoolExecutor = null;
}
@Test
public void correctThreadPoolExecutorShouldBeCreated() {
assertTrue(threadPoolExecutor.allowsCoreThreadTimeOut());
assertEquals(countExecutorThreads(), 0);
}
@Test
public void threadPoolExecutorShouldBeAbleToChangeCoreThreadTimeoutSetting() {
threadPoolExecutor.allowCoreThreadTimeOut(false);
assertFalse(threadPoolExecutor.allowsCoreThreadTimeOut());
threadPoolExecutor.allowCoreThreadTimeOut(true);
assertTrue(threadPoolExecutor.allowsCoreThreadTimeOut());
}
@Test
public void shouldRemoveThreadsAfterCoreThreadTimeoutPeriod() throws Throwable {
CountDownLatch untilAllThreadsDone = startaBunchOfThreads();
untilAllThreadsDone.await();
waitUntilTimeoutOccurs();
assertEquals(countExecutorThreads(), 0);
}
@Test
public void threadsShouldBePresentBeforeKeepAliveTimeKicks() throws Throwable {
CountDownLatch untilAllThreadsDone = startaBunchOfThreads();
untilAllThreadsDone.await();
waitBeforeTimeoutOccurs();
assertEquals(countExecutorThreads(), threadPoolSize);
}
@Test
public void threadsShouldBePresentAfterTimeoutIfWeDontAllowThreadTimeout() throws Throwable {
threadPoolExecutor.allowCoreThreadTimeOut(false);
CountDownLatch untilAllThreadsDone = startaBunchOfThreads();
untilAllThreadsDone.await();
waitUntilTimeoutOccurs();
assertEquals(countExecutorThreads(), threadPoolSize);
}
private ThreadPoolExecutor prepareThreadPoolExecutor() {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2 * threadPoolSize);
ThreadPoolExecutor threadPoolExecutorLocal = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
timeoutMillis, TimeUnit.MILLISECONDS,
queue, new IdentifiableThreadFactory());
threadPoolExecutorLocal.allowCoreThreadTimeOut(true);
return threadPoolExecutorLocal;
}
private CountDownLatch startaBunchOfThreads() {
final CountDownLatch allThreadsDoneWaiter = new CountDownLatch(threadPoolSize);
for (int i = 0; i < threadPoolSize; i++) {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
allThreadsDoneWaiter.countDown();
}
});
}
return allThreadsDoneWaiter;
}
private void waitBeforeTimeoutOccurs() throws InterruptedException {
Thread.sleep(timeoutMillis / 2);
}
private void waitUntilTimeoutOccurs() throws InterruptedException {
Thread.sleep(timeoutMillis * 2);
}
static class IdentifiableThreadFactory implements ThreadFactory {
static ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(Runnable r) {
Thread t = defaultThreadFactory.newThread(r);
t.setName("CoreThreadTimeOut-" + t.getName());
return t;
}
}
private int countExecutorThreads() {
Thread[] threads = new Thread[Thread.activeCount() + 100];
Thread.enumerate(threads);
int count = 0;
for (Thread t : threads) {
if (t != null
&& t.getName().matches("CoreThreadTimeOut-pool-[0-9]+-thread-[0-9]+")) {
count++;
}
}
return count;
}
private long millisElapsedSince(long t0) {
return (System.nanoTime() - t0) / (1000L * 1000L);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment