Last active
January 3, 2017 20:10
-
-
Save Andrei-Pozolotin/ff912744323d3dbb758b890e81365463 to your computer and use it in GitHub Desktop.
BUG: appender -> tailer synchronization contract: https://github.com/OpenHFT/Chronicle-Queue/issues/317
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package demo; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import net.openhft.chronicle.queue.ExcerptAppender; | |
import net.openhft.chronicle.queue.ExcerptTailer; | |
import net.openhft.chronicle.queue.RollCycles; | |
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; | |
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; | |
import net.openhft.chronicle.wire.DocumentContext; | |
public class Main { | |
// message writer with notification on every write | |
static class Producer extends Thread { | |
final ExcerptAppender appender; | |
final int queueSize; | |
final Object signal; | |
Producer(ExcerptAppender appender, int queueSize, Object signal) { | |
this.appender = appender; | |
this.queueSize = queueSize; | |
this.signal = signal; | |
} | |
@Override | |
public void run() { | |
log("writer: start"); | |
for (int sequence = 0; sequence < queueSize; sequence++) { | |
DocumentContext doc = appender.writingDocument(); | |
try { | |
doc.wire().bytes().writeInt(sequence); | |
} finally { | |
doc.close(); | |
} | |
synchronized (signal) { | |
signal.notifyAll(); | |
} | |
} | |
log("writer: finish"); | |
} | |
} | |
// message reader with notification when needed | |
static class Consumer extends Thread { | |
final ExcerptTailer tailer; | |
final Object signal; | |
Consumer(ExcerptTailer tailer, Object signal) { | |
this.tailer = tailer; | |
this.signal = signal; | |
} | |
// number of queue reads | |
volatile int queueSize = 0; | |
// number of queue waits | |
volatile int queueWait = 0; | |
// defer read if not ready | |
boolean assertConinue() { | |
boolean wait = false; | |
DocumentContext doc = tailer.readingDocument(); | |
try { | |
if (doc.isPresent()) { | |
assertSequence(doc); | |
queueSize++; | |
} else { | |
wait = true; | |
queueWait++; | |
} | |
} finally { | |
doc.close(); | |
} | |
// report wait request | |
return wait; | |
} | |
// verify can read immediately | |
void assertPresent() { | |
DocumentContext doc = tailer.readingDocument(); | |
try { | |
if (doc.isPresent()) { | |
assertSequence(doc); | |
queueSize++; | |
} else { | |
throw new Error("not present"); | |
} | |
} finally { | |
doc.close(); | |
} | |
} | |
int sequence = 0; | |
// verify reader sequence | |
void assertSequence(DocumentContext doc) { | |
if (sequence == doc.wire().bytes().readInt()) { | |
sequence++; | |
} else { | |
throw new Error("wrong sequence"); | |
} | |
} | |
@Override | |
public void run() { | |
log("reader: start"); | |
while (true) { | |
boolean wait = assertConinue(); | |
if (wait) { | |
synchronized (signal) { | |
try { | |
signal.wait(); | |
} catch (InterruptedException e) { | |
log("reader: finish"); | |
return; | |
} | |
} | |
assertPresent(); // comment this out to make it work | |
} | |
} | |
} | |
} | |
static SimpleDateFormat date_time = new SimpleDateFormat("HH:mm:ss.SSS"); | |
static void log(String text) { | |
System.out.println(date_time.format(new Date()) + " " + text); | |
} | |
public static void main(String[] args) throws Exception { | |
int blockSize = 1024 * 1024; | |
String path = "target/queue" + System.currentTimeMillis(); | |
SingleChronicleQueue queue = SingleChronicleQueueBuilder // | |
.binary(path) // | |
.blockSize(blockSize) // | |
.rollCycle(RollCycles.TEST_SECONDLY).build(); | |
int queueSize = 100_000; | |
// notification lock | |
String signal = "signal"; | |
// message producer | |
Producer writer = new Producer(queue.acquireAppender(), queueSize, signal); | |
// message consumer | |
Consumer reader1 = new Consumer(queue.createTailer(), signal); | |
Consumer reader2 = new Consumer(queue.createTailer(), signal); | |
Consumer reader3 = new Consumer(queue.createTailer(), signal); | |
log("all start"); | |
writer.start(); | |
reader1.start(); | |
reader2.start(); | |
reader3.start(); | |
log("writer stop"); | |
writer.join(); | |
log("readers complete"); | |
Thread.sleep(3 * 1000); | |
log("readers stop"); | |
reader1.interrupt(); | |
reader2.interrupt(); | |
reader3.interrupt(); | |
reader1.join(); | |
reader2.join(); | |
reader3.join(); | |
log("report result"); | |
log("queueSize=" + queueSize); | |
log("reader 1 size=" + reader1.queueSize + " wait=" + reader1.queueWait); | |
log("reader 2 size=" + reader2.queueSize + " wait=" + reader2.queueWait); | |
log("reader 3 size=" + reader3.queueSize + " wait=" + reader3.queueWait); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13:25:10.354 all start | |
13:25:10.355 writer: start | |
13:25:10.355 reader: start | |
13:25:10.356 reader: start | |
13:25:10.356 writer stop | |
13:25:10.356 reader: start | |
Exception in thread "Thread-3" Exception in thread "Thread-2" Exception in thread "Thread-1" java.lang.Error: not present | |
at demo.Main$Consumer.assertPresent(Main.java:88) | |
at demo.Main$Consumer.run(Main.java:120) | |
java.lang.Error: not present | |
at demo.Main$Consumer.assertPresent(Main.java:88) | |
at demo.Main$Consumer.run(Main.java:120) | |
java.lang.Error: not present | |
at demo.Main$Consumer.assertPresent(Main.java:88) | |
at demo.Main$Consumer.run(Main.java:120) | |
13:25:10.608 writer: finish | |
13:25:10.608 readers complete | |
13:25:13.608 readers stop | |
13:25:13.609 report result | |
13:25:13.609 queueSize=100000 | |
13:25:13.609 reader 1 size=177 wait=7 | |
13:25:13.609 reader 2 size=169 wait=3 | |
13:25:13.609 reader 3 size=165 wait=7 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment