Created
October 4, 2011 08:51
-
-
Save grignaak/1261175 to your computer and use it in GitHub Desktop.
Wait free vs SynchronizedDeque vs Partially Synchronized Queues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The wait free algorithm is at the very bottom of the page. | |
package performance; | |
import java.util.ArrayDeque; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import org.junit.experimental.theories.DataPoint; | |
import org.junit.experimental.theories.Theories; | |
import org.junit.experimental.theories.Theory; | |
import org.junit.runner.RunWith; | |
@RunWith(Theories.class) | |
public class CompareQueues { | |
private static int OP_COUNT = 1000000; | |
public static @DataPoint QueueRunner blocking = new SimpleQueueRunner(new LinkedBlockingQueue<Integer>()); | |
public static @DataPoint QueueRunner concurrent = new SimpleQueueRunner(new ConcurrentLinkedQueue<Integer>()); | |
public static @DataPoint QueueRunner array = new SynchronizedQueueRunner(new ArrayDeque<Integer>()); | |
public static @DataPoint QueueRunner chunkConcurrent = new WaitFreeQueueRunner(new ConcurrentLinkedQueue<Integer>()); | |
@Theory public void | |
compare(QueueRunner queue) throws InterruptedException { | |
long start = System.nanoTime(); | |
for (int i = 0; i < OP_COUNT; i++) { | |
queue.add(i); | |
} | |
queue.await(); | |
long end = System.nanoTime(); | |
long runningTime = end - start; | |
double timePerOp = runningTime / OP_COUNT; // includes thread overhead | |
System.out.println(queue + " : " + timePerOp + " nanos / op"); | |
queue.shutdown(); | |
} | |
private static abstract class QueueRunner { | |
protected final Queue<Integer> queue; | |
private final ExecutorService exec = Executors.newFixedThreadPool(2); | |
private Accounting accounting = new Accounting(); | |
private Runnable processor = new Runnable() { | |
public void run() { | |
process(accounting); | |
} | |
}; | |
QueueRunner(Queue<Integer> queue) { this.queue = queue; } | |
public void await() throws InterruptedException { accounting.await(); } | |
public void shutdown() throws InterruptedException { | |
exec.shutdownNow(); | |
while (!exec.isTerminated()) | |
exec.awaitTermination(1, TimeUnit.SECONDS); | |
} | |
protected void exec() { | |
exec.execute(processor); | |
} | |
public String toString() { | |
return queue.getClass().getSimpleName(); | |
} | |
abstract void add(Integer item); | |
protected abstract void process(Accounting accounting); | |
} | |
private static class Accounting { | |
private final CountDownLatch done = new CountDownLatch(OP_COUNT); | |
private Integer expected = 0; | |
public void await() throws InterruptedException { done.await(); } | |
public void apply(Integer i) { | |
if (!expected.equals(i)) throw new RuntimeException(); | |
expected++; | |
done.countDown(); | |
} | |
} | |
public static class SynchronizedQueueRunner extends QueueRunner { | |
public SynchronizedQueueRunner(Queue<Integer> queue) { | |
super(queue); | |
} | |
public synchronized void add(Integer i) { | |
queue.add(i); | |
exec(); | |
} | |
protected synchronized void process(Accounting accounting) { | |
accounting.apply(queue.remove()); | |
} | |
public String toString() { | |
return "synchronized " + super.toString(); | |
} | |
} | |
public static class SimpleQueueRunner extends QueueRunner { | |
SimpleQueueRunner(Queue<Integer> queue) { | |
super(queue); | |
} | |
public void add(Integer i) { | |
queue.add(i); | |
exec(); | |
} | |
// NOTE: This is synchronized because accounting.apply is not! | |
protected synchronized void process(Accounting accounting) { | |
accounting.apply(queue.remove()); | |
} | |
} | |
public static class WaitFreeQueueRunner extends QueueRunner { | |
private volatile boolean enqueued = false; // NOTE: this flag is just an optimization | |
private AtomicBoolean lock = new AtomicBoolean(); | |
private static int MAX_ITEMS = 100; | |
public WaitFreeQueueRunner(Queue<Integer> queue) { | |
super(queue); | |
} | |
public void add(Integer i) { | |
queue.add(i); | |
if (!enqueued) { | |
enqueued = true; | |
exec(); | |
} | |
} | |
protected void process(Accounting accounting) { | |
if (lock.getAndSet(true)) return; | |
for (int count = 0; !queue.isEmpty() && count < MAX_ITEMS; count++) { | |
// This represents long work where everything must be ordered | |
accounting.apply(queue.remove()); | |
} | |
lock.set(false); | |
enqueued = false; // NOTE! Important that this comes after unlocking | |
if (!queue.isEmpty() && !enqueued) { | |
enqueued = true; | |
exec(); | |
} | |
} | |
public String toString() { | |
return "wait-free with " + super.toString(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment