Created
September 25, 2011 07:02
-
-
Save monzou/1240319 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sandbox.wait; | |
import java.util.concurrent.BrokenBarrierException; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.CyclicBarrier; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import junit.framework.TestCase; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.google.common.collect.Maps; | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
public class Sandbox extends TestCase { | |
private static final Logger LOGGER = LoggerFactory.getLogger(Sandbox.class); | |
public void test() throws Exception { | |
final Session session = new Session(); | |
ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("client-thread-%s").build()); | |
int nThreads = 15; | |
final CyclicBarrier startBarrier = new CyclicBarrier(nThreads + 1); | |
final CyclicBarrier endBarrier = new CyclicBarrier(nThreads + 1); | |
for (int i = 0; i < nThreads; i++) { | |
final String taskName = String.format("task-%d", i); | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
startBarrier.await(); | |
} catch (InterruptedException e) { | |
fail(); | |
} catch (BrokenBarrierException e) { | |
fail(); | |
} | |
LOGGER.debug("submit task: {}", taskName); | |
session.run(taskName); | |
try { | |
endBarrier.await(); | |
} catch (InterruptedException e) { | |
fail(); | |
} catch (BrokenBarrierException e) { | |
fail(); | |
} | |
LOGGER.debug("finished: {}", taskName); | |
} | |
}); | |
} | |
LOGGER.debug(">>> START SUBMITTING"); | |
startBarrier.await(); | |
endBarrier.await(); | |
LOGGER.debug(">>> END SUBMITTING"); | |
executor.shutdown(); | |
LOGGER.debug("runnning tasks..."); | |
assertFalse(session.isEmpty()); | |
LOGGER.debug("waiting ..."); | |
session.waitAndDestroy(); | |
LOGGER.debug("returned."); | |
assertTrue(session.isEmpty()); | |
} | |
private static class Session { | |
private final ExecutorService executor; | |
private final ConcurrentMap<Runnable, Future<?>> futures; | |
Session() { | |
executor = Executors.newFixedThreadPool(10, new ThreadFactoryBuilder().setNameFormat("worker-thread-%s").build()); | |
futures = Maps.newConcurrentMap(); | |
} | |
synchronized boolean isEmpty() { | |
return futures.isEmpty(); | |
} | |
synchronized void run(final String taskName) { | |
Runnable command = new Runnable() { | |
@Override | |
public void run() { | |
try { | |
Thread.sleep(3000); | |
LOGGER.debug("consume: {}", taskName); | |
} catch (InterruptedException e) { | |
LOGGER.warn("task thread has been interrupted: {}", taskName); | |
} finally { | |
futures.remove(this); | |
} | |
} | |
}; | |
Future<?> future = executor.submit(command); | |
futures.put(command, future); | |
if (future.isDone()) { | |
futures.remove(command); | |
} | |
} | |
synchronized void waitAndDestroy() { | |
for (Future<?> future : futures.values()) { | |
try { | |
future.get(); | |
} catch (InterruptedException e) { | |
LOGGER.warn("task thread has been interrupted.", e); | |
} catch (ExecutionException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
futures.clear(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment