Skip to content

Instantly share code, notes, and snippets.

@Andrei-Pozolotin
Last active January 3, 2017 20:10
Show Gist options
  • Save Andrei-Pozolotin/ff912744323d3dbb758b890e81365463 to your computer and use it in GitHub Desktop.
Save Andrei-Pozolotin/ff912744323d3dbb758b890e81365463 to your computer and use it in GitHub Desktop.
BUG: appender -> tailer synchronization contract: https://github.com/OpenHFT/Chronicle-Queue/issues/317
package demo;
import java.text.SimpleDateFormat;
import java.util.Date;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
public class Main {
// message writer with notification on every write
static class Producer extends Thread {
final ExcerptAppender appender;
final int queueSize;
final Object signal;
Producer(ExcerptAppender appender, int queueSize, Object signal) {
this.appender = appender;
this.queueSize = queueSize;
this.signal = signal;
}
@Override
public void run() {
log("writer: start");
for (int sequence = 0; sequence < queueSize; sequence++) {
DocumentContext doc = appender.writingDocument();
try {
doc.wire().bytes().writeInt(sequence);
} finally {
doc.close();
}
synchronized (signal) {
signal.notifyAll();
}
}
log("writer: finish");
}
}
// message reader with notification when needed
static class Consumer extends Thread {
final ExcerptTailer tailer;
final Object signal;
Consumer(ExcerptTailer tailer, Object signal) {
this.tailer = tailer;
this.signal = signal;
}
// number of queue reads
volatile int queueSize = 0;
// number of queue waits
volatile int queueWait = 0;
// defer read if not ready
boolean assertConinue() {
boolean wait = false;
DocumentContext doc = tailer.readingDocument();
try {
if (doc.isPresent()) {
assertSequence(doc);
queueSize++;
} else {
wait = true;
queueWait++;
}
} finally {
doc.close();
}
// report wait request
return wait;
}
// verify can read immediately
void assertPresent() {
DocumentContext doc = tailer.readingDocument();
try {
if (doc.isPresent()) {
assertSequence(doc);
queueSize++;
} else {
throw new Error("not present");
}
} finally {
doc.close();
}
}
int sequence = 0;
// verify reader sequence
void assertSequence(DocumentContext doc) {
if (sequence == doc.wire().bytes().readInt()) {
sequence++;
} else {
throw new Error("wrong sequence");
}
}
@Override
public void run() {
log("reader: start");
while (true) {
boolean wait = assertConinue();
if (wait) {
synchronized (signal) {
try {
signal.wait();
} catch (InterruptedException e) {
log("reader: finish");
return;
}
}
assertPresent(); // comment this out to make it work
}
}
}
}
static SimpleDateFormat date_time = new SimpleDateFormat("HH:mm:ss.SSS");
static void log(String text) {
System.out.println(date_time.format(new Date()) + " " + text);
}
public static void main(String[] args) throws Exception {
int blockSize = 1024 * 1024;
String path = "target/queue" + System.currentTimeMillis();
SingleChronicleQueue queue = SingleChronicleQueueBuilder //
.binary(path) //
.blockSize(blockSize) //
.rollCycle(RollCycles.TEST_SECONDLY).build();
int queueSize = 100_000;
// notification lock
String signal = "signal";
// message producer
Producer writer = new Producer(queue.acquireAppender(), queueSize, signal);
// message consumer
Consumer reader1 = new Consumer(queue.createTailer(), signal);
Consumer reader2 = new Consumer(queue.createTailer(), signal);
Consumer reader3 = new Consumer(queue.createTailer(), signal);
log("all start");
writer.start();
reader1.start();
reader2.start();
reader3.start();
log("writer stop");
writer.join();
log("readers complete");
Thread.sleep(3 * 1000);
log("readers stop");
reader1.interrupt();
reader2.interrupt();
reader3.interrupt();
reader1.join();
reader2.join();
reader3.join();
log("report result");
log("queueSize=" + queueSize);
log("reader 1 size=" + reader1.queueSize + " wait=" + reader1.queueWait);
log("reader 2 size=" + reader2.queueSize + " wait=" + reader2.queueWait);
log("reader 3 size=" + reader3.queueSize + " wait=" + reader3.queueWait);
}
}
13:25:10.354 all start
13:25:10.355 writer: start
13:25:10.355 reader: start
13:25:10.356 reader: start
13:25:10.356 writer stop
13:25:10.356 reader: start
Exception in thread "Thread-3" Exception in thread "Thread-2" Exception in thread "Thread-1" java.lang.Error: not present
at demo.Main$Consumer.assertPresent(Main.java:88)
at demo.Main$Consumer.run(Main.java:120)
java.lang.Error: not present
at demo.Main$Consumer.assertPresent(Main.java:88)
at demo.Main$Consumer.run(Main.java:120)
java.lang.Error: not present
at demo.Main$Consumer.assertPresent(Main.java:88)
at demo.Main$Consumer.run(Main.java:120)
13:25:10.608 writer: finish
13:25:10.608 readers complete
13:25:13.608 readers stop
13:25:13.609 report result
13:25:13.609 queueSize=100000
13:25:13.609 reader 1 size=177 wait=7
13:25:13.609 reader 2 size=169 wait=3
13:25:13.609 reader 3 size=165 wait=7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment