Skip to content

Instantly share code, notes, and snippets.

@adamw
Created October 31, 2023 09:53
Show Gist options
  • Save adamw/faa25c328e262c4b59e7a5287edfc6ba to your computer and use it in GitHub Desktop.
Save adamw/faa25c328e262c4b59e7a5287edfc6ba to your computer and use it in GitHub Desktop.
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
public class Rendezvous {
private final int spinIterations;
private final int yieldIterations;
private final AtomicReference<ThreadAndCell> waiting = new AtomicReference<>();
public Rendezvous(int spinIterations, int yieldIterations) {
this.spinIterations = spinIterations;
this.yieldIterations = yieldIterations;
}
public void test() throws Exception {
long start = System.currentTimeMillis();
final int max = 10_000_000;
Thread t1 = Thread.ofPlatform().start(() -> {
Thread ourThread = Thread.currentThread();
for (int i = 0; i <= max; i++) {
AtomicReference<Integer> ourCell = new AtomicReference<>(i);
if (waiting.compareAndSet(null, new ThreadAndCell(ourThread, ourCell))) {
// CAS was successful, we are the first thread: parking and waiting for the data to be consumed
int doSpin = spinIterations;
int doYield = yieldIterations;
while (ourCell.get() != -1) {
if (doSpin > 0) {
Thread.onSpinWait();
doSpin -= 1;
} else if (doYield > 0) {
Thread.yield();
doYield -= 1;
} else {
LockSupport.park();
}
}
} else {
// CAS was unsuccessful, there is already a thread waiting for us: clearing `waiting` for the
// next iteration, sending the data using the provided cell and unparking the other thread
ThreadAndCell other = waiting.get();
waiting.set(null);
other.cell.set(i);
LockSupport.unpark(other.thread);
}
}
});
Thread t2 = Thread.ofPlatform().start(() -> {
long acc = 0L;
Thread ourThread = Thread.currentThread();
for (int i = 0; i <= max; i++) {
AtomicReference<Integer> ourCell = new AtomicReference<>(-1); // -1 -> no data provided yet
if (waiting.compareAndSet(null, new ThreadAndCell(ourThread, ourCell))) {
// CAS was successful, we are the first thread: parking and waiting for the data to be provided
int doSpin = spinIterations;
int doYield = yieldIterations;
while (ourCell.get() == -1) {
if (doSpin > 0) {
Thread.onSpinWait();
doSpin -= 1;
} else if (doYield > 0) {
Thread.yield();
doYield -= 1;
} else {
LockSupport.park();
}
}
acc += ourCell.get();
} else {
// CAS was unsuccessful, there is already a thread waiting for us: clearing `waiting` for the
// next iteration, consuming the data and unparking the other thread
ThreadAndCell other = waiting.get();
waiting.set(null);
acc += other.cell.get();
other.cell.set(-1);
LockSupport.unpark(other.thread);
}
}
assert acc == sumUpTo(max);
});
t1.join();
t2.join();
long end = System.currentTimeMillis();
System.out.println("Took (spin=" + spinIterations + ", yield=" + yieldIterations + "): " + (end - start) + " ms");
}
private long sumUpTo(int max) {
return ((long) max * (max + 1)) / 2;
}
private record ThreadAndCell(Thread thread, AtomicReference<Integer> cell) {}
public static void main(String[] args) throws Exception {
for (int i=0; i<100; i++) {
new Rendezvous(10_000, 0).test();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment