Skip to content

Instantly share code, notes, and snippets.

@monzou
Created September 25, 2011 07:02
Show Gist options
  • Save monzou/1240319 to your computer and use it in GitHub Desktop.
Save monzou/1240319 to your computer and use it in GitHub Desktop.
package sandbox.wait;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import junit.framework.TestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class Sandbox extends TestCase {
private static final Logger LOGGER = LoggerFactory.getLogger(Sandbox.class);
public void test() throws Exception {
final Session session = new Session();
ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("client-thread-%s").build());
int nThreads = 15;
final CyclicBarrier startBarrier = new CyclicBarrier(nThreads + 1);
final CyclicBarrier endBarrier = new CyclicBarrier(nThreads + 1);
for (int i = 0; i < nThreads; i++) {
final String taskName = String.format("task-%d", i);
executor.execute(new Runnable() {
@Override
public void run() {
try {
startBarrier.await();
} catch (InterruptedException e) {
fail();
} catch (BrokenBarrierException e) {
fail();
}
LOGGER.debug("submit task: {}", taskName);
session.run(taskName);
try {
endBarrier.await();
} catch (InterruptedException e) {
fail();
} catch (BrokenBarrierException e) {
fail();
}
LOGGER.debug("finished: {}", taskName);
}
});
}
LOGGER.debug(">>> START SUBMITTING");
startBarrier.await();
endBarrier.await();
LOGGER.debug(">>> END SUBMITTING");
executor.shutdown();
LOGGER.debug("runnning tasks...");
assertFalse(session.isEmpty());
LOGGER.debug("waiting ...");
session.waitAndDestroy();
LOGGER.debug("returned.");
assertTrue(session.isEmpty());
}
private static class Session {
private final ExecutorService executor;
private final ConcurrentMap<Runnable, Future<?>> futures;
Session() {
executor = Executors.newFixedThreadPool(10, new ThreadFactoryBuilder().setNameFormat("worker-thread-%s").build());
futures = Maps.newConcurrentMap();
}
synchronized boolean isEmpty() {
return futures.isEmpty();
}
synchronized void run(final String taskName) {
Runnable command = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
LOGGER.debug("consume: {}", taskName);
} catch (InterruptedException e) {
LOGGER.warn("task thread has been interrupted: {}", taskName);
} finally {
futures.remove(this);
}
}
};
Future<?> future = executor.submit(command);
futures.put(command, future);
if (future.isDone()) {
futures.remove(command);
}
}
synchronized void waitAndDestroy() {
for (Future<?> future : futures.values()) {
try {
future.get();
} catch (InterruptedException e) {
LOGGER.warn("task thread has been interrupted.", e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
futures.clear();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment