import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jpos.util.NameRegistrar.NotFoundException;
import org.junit.Test;
public class SomeConcurrencyTest {
static final int TOTAL_THREADS_TO_RUN = 1000;
@Test
public void testConcurrency() throws Exception {
List<Runnable> parrallelTasksList = new ArrayList<Runnable>(TOTAL_THREADS_TO_RUN);
for (int i = 0; i < TOTAL_THREADS_TO_RUN; i++) {
final int counter = i;
parrallelTasksList.add(new Runnable() {
public void run() {
// <PSEUDOCODE>
setup object to test and invoke
assertThat(someMultiThreadSafeObjectToTest, is(tested));
// </PSEUDOCODE>
}
});
}
int maxTimeoutSeconds = 5;
assertConcurrent("somemethod must be ThreadSafe", parrallelTasksList, maxTimeoutSeconds);
}
public static void assertConcurrent(final String message, final List<? extends Runnable> runnables, final int maxTimeoutSeconds) throws InterruptedException {
final int numThreads = runnables.size();
final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
try {
final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads);
final CountDownLatch afterInitBlocker = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(numThreads);
for (final Runnable submittedTestRunnable : runnables) {
threadPool.submit(new Runnable() {
public void run() {
allExecutorThreadsReady.countDown();
try {
afterInitBlocker.await();
submittedTestRunnable.run();
} catch (final Throwable e) {
exceptions.add(e);
} finally {
allDone.countDown();
}
}
});
}
// wait until all threads are ready
assertTrue("Timeout initializing threads! Perform long lasting initializations before passing runnables to assertConcurrent", allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS));
// start all test runners
afterInitBlocker.countDown();
assertTrue(message +" timeout! More than" + maxTimeoutSeconds + "seconds", allDone.await(maxTimeoutSeconds, TimeUnit.SECONDS));
} finally {
threadPool.shutdownNow();
}
assertTrue(message + "failed with exception(s)" + exceptions, exceptions.isEmpty());
}
I explain shortly the most important parts of this test. For every runnable we create an adaptor runnable that takes care of waiting for all threads to be created, submitting failures to our exceptions list and notification when processing finished. It is very important to catch Throwable since we expect anything to happen from AssertionException, ConcurrentModificationException to Business Exceptions. The adaptor runnables are being put into a jdk provided thread pool of the same size as the number of runnables. Just after being put into the thread pool an added thread executes and waits on
afterInitBlocker.await();
while the starting thread (may) waits on
allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS);
This way it is guaranteed that when we call
afterInitBlocker.countDown();
from the starting thread that all testing threads are fully initialized.
afterInitBlocker.countDown();
notifies all waiting threads and they start testing
submittedTestRunnable.run();
By starting all testing threads this way we achieve the maximum concurrent test load possible. The inner finally block assures that our testing thread is notified whether we catch a failure or the test runs smoothly. While the test threads execute whatever they are being put up to the starting thread waits at
allDone.await(maxTimeoutSeconds, TimeUnit.SECONDS);
It is possible the await fails with an Exception when the timeout is reached. This could be caused by a deadlock, thread starvation, other processing on the testing machine or even by the timeout being too short. The thread pool is stopped in any case – even when we got a timeout. At the end we check if any thread has aborted with an exception.
What's the license on this? Is it open source I can use legally or just something I can read?