Skip to content

Instantly share code, notes, and snippets.

@dejanb
Created June 20, 2011 16:21
Show Gist options
  • Save dejanb/1035920 to your computer and use it in GitHub Desktop.
Save dejanb/1035920 to your computer and use it in GitHub Desktop.
diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
index 6a83016..8a39703 100644
--- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.usecases;
import java.lang.management.ManagementFactory;
import javax.jms.Connection;
+import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -43,7 +44,15 @@ public class DurableUnsubscribeTest extends org.apache.activemq.TestSupport {
Destination d = broker.getDestination(topic);
assertEquals("Subscription is missing.", 1, d.getConsumers().size());
+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(topic);
+ for (int i = 0; i < 1000; i++) {
+ producer.send(session.createTextMessage("text"));
+ }
+
+ Thread.sleep(1000);
+
session.unsubscribe("SubsId");
session.close();
@@ -92,7 +101,7 @@ public class DurableUnsubscribeTest extends org.apache.activemq.TestSupport {
private void createBroker() throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://localhost)");
- broker.setPersistent(false);
+ //broker.setPersistent(false);
broker.setUseJmx(true);
broker.setBrokerName(getName());
broker.start();
diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
index 9334318..d19b8fe 100644
--- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
+++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
@@ -138,17 +138,35 @@ public class PageFile {
Page page;
byte[] current;
byte[] diskBound;
+ int currentLocation = -1;
+ int diskBoundLocation = -1;
+ File tmpFile;
+ int length;
public PageWrite(Page page, byte[] data) {
this.page=page;
current=data;
}
+
+ public PageWrite(Page page, int currentLocation, int length, File tmpFile) {
+ this.page = page;
+ this.currentLocation = currentLocation;
+ this.tmpFile = tmpFile;
+ this.length = length;
+ }
public void setCurrent(Page page, byte[] data) {
this.page=page;
current=data;
}
+ public void setCurrentLocation(Page page, int location, int length) {
+ this.page = page;
+ this.currentLocation = location;
+ this.length = length;
+ this.current = null;
+ }
+
@Override
public String toString() {
return "[PageWrite:"+page.getPageId()+"]";
@@ -158,22 +176,40 @@ public class PageFile {
public Page getPage() {
return page;
}
+
+ public byte[] getDiskBound() throws IOException {
+ if (diskBoundLocation != -1) {
+ diskBound = new byte[length];
+ RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
+ file.seek(diskBoundLocation);
+ int readNum = file.read(diskBound);
+ file.close();
+ diskBoundLocation = -1;
+ }
+ return diskBound;
+ }
void begin() {
- diskBound = current;
- current = null;
+ if (currentLocation != -1) {
+ diskBoundLocation = currentLocation;
+ currentLocation = -1;
+ } else {
+ diskBound = current;
+ current = null;
+ }
}
/**
* @return true if there is no pending writes to do.
*/
boolean done() {
+ diskBoundLocation = -1;
diskBound=null;
- return current == null;
+ return current == null || currentLocation == -1;
}
boolean isDone() {
- return diskBound == null && current == null;
+ return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
}
}
@@ -470,7 +506,7 @@ public class PageFile {
return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
}
- private long toOffset(long pageId) {
+ public long toOffset(long pageId) {
return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
}
@@ -823,6 +859,8 @@ public class PageFile {
}
}
+ boolean longTx = false;
+
for (Map.Entry<Long, PageWrite> entry : updates) {
Long key = entry.getKey();
PageWrite value = entry.getValue();
@@ -830,12 +868,19 @@ public class PageFile {
if( write==null ) {
writes.put(key, value);
} else {
- write.setCurrent(value.page, value.current);
+ if (value.currentLocation != -1) {
+ write.setCurrentLocation(value.page, value.currentLocation, value.length);
+ write.tmpFile = value.tmpFile;
+ longTx = true;
+ } else {
+ write.setCurrent(value.page, value.current);
+ }
}
}
// Once we start approaching capacity, notify the writer to start writing
- if( canStartWriteBatch() ) {
+ if( longTx || canStartWriteBatch() ) {
+
if( enabledWriteThread ) {
writes.notify();
} else {
@@ -919,115 +964,90 @@ public class PageFile {
}
}
- /**
- *
- * @return true if there are still pending writes to do.
- * @throws InterruptedException
- * @throws IOException
- */
- private void writeBatch() throws IOException {
-
- CountDownLatch checkpointLatch;
- ArrayList<PageWrite> batch;
- synchronized( writes ) {
+ private void writeBatch() throws IOException {
+
+ CountDownLatch checkpointLatch;
+ ArrayList<PageWrite> batch;
+ synchronized( writes ) {
// If there is not enough to write, wait for a notification...
batch = new ArrayList<PageWrite>(writes.size());
// build a write batch from the current write cache.
for (PageWrite write : writes.values()) {
batch.add(write);
- // Move the current write to the diskBound write, this lets folks update the
+ // Move the current write to the diskBound write, this lets folks update the
// page again without blocking for this write.
write.begin();
- if (write.diskBound == null) {
+ if (write.diskBound == null && write.diskBoundLocation == -1) {
batch.remove(write);
}
}
- // Grab on to the existing checkpoint latch cause once we do this write we can
+ // Grab on to the existing checkpoint latch cause once we do this write we can
// release the folks that were waiting for those writes to hit disk.
checkpointLatch = this.checkpointLatch;
this.checkpointLatch=null;
- }
-
- try {
- if (enableRecoveryFile) {
-
- // Using Adler-32 instead of CRC-32 because it's much faster and
- // it's
- // weakness for short messages with few hundred bytes is not a
- // factor in this case since we know
- // our write batches are going to much larger.
- Checksum checksum = new Adler32();
- for (PageWrite w : batch) {
- try {
- checksum.update(w.diskBound, 0, pageSize);
- } catch (Throwable t) {
- throw IOExceptionSupport.create(
- "Cannot create recovery file. Reason: " + t, t);
- }
- }
-
- // Can we shrink the recovery buffer??
- if (recoveryPageCount > recoveryFileMaxPageCount) {
- int t = Math.max(recoveryFileMinPageCount, batch.size());
- recoveryFile.setLength(recoveryFileSizeForPages(t));
- }
-
- // Record the page writes in the recovery buffer.
- recoveryFile.seek(0);
- // Store the next tx id...
- recoveryFile.writeLong(nextTxid.get());
- // Store the checksum for thw write batch so that on recovery we
- // know if we have a consistent
- // write batch on disk.
- recoveryFile.writeLong(checksum.getValue());
- // Write the # of pages that will follow
- recoveryFile.writeInt(batch.size());
-
- // Write the pages.
- recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
-
- for (PageWrite w : batch) {
- recoveryFile.writeLong(w.page.getPageId());
- recoveryFile.write(w.diskBound, 0, pageSize);
- }
-
- if (enableDiskSyncs) {
- // Sync to make sure recovery buffer writes land on disk..
- recoveryFile.getFD().sync();
- }
-
- recoveryPageCount = batch.size();
- }
-
- for (PageWrite w : batch) {
- writeFile.seek(toOffset(w.page.getPageId()));
- writeFile.write(w.diskBound, 0, pageSize);
- w.done();
- }
-
- // Sync again
- if (enableDiskSyncs) {
- writeFile.getFD().sync();
- }
-
- } finally {
- synchronized (writes) {
- for (PageWrite w : batch) {
- // If there are no more pending writes, then remove it from
- // the write cache.
- if (w.isDone()) {
- writes.remove(w.page.getPageId());
- }
- }
- }
-
- if( checkpointLatch!=null ) {
- checkpointLatch.countDown();
- }
- }
- }
+ }
+
+ Checksum checksum = new Adler32();
+ recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+ for (PageWrite w : batch) {
+ if (enableRecoveryFile) {
+ try {
+ checksum.update(w.getDiskBound(), 0, pageSize);
+ } catch (Throwable t) {
+ throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
+ }
+ recoveryFile.writeLong(w.page.getPageId());
+ recoveryFile.write(w.getDiskBound(), 0, pageSize);
+ }
+
+ writeFile.seek(toOffset(w.page.getPageId()));
+ writeFile.write(w.getDiskBound(), 0, pageSize);
+ w.done();
+ }
+
+ try {
+ if (enableRecoveryFile) {
+ // Can we shrink the recovery buffer??
+ if (recoveryPageCount > recoveryFileMaxPageCount) {
+ int t = Math.max(recoveryFileMinPageCount, batch.size());
+ recoveryFile.setLength(recoveryFileSizeForPages(t));
+ }
+
+ // Record the page writes in the recovery buffer.
+ recoveryFile.seek(0);
+ // Store the next tx id...
+ recoveryFile.writeLong(nextTxid.get());
+ // Store the checksum for thw write batch so that on recovery we
+ // know if we have a consistent
+ // write batch on disk.
+ recoveryFile.writeLong(checksum.getValue());
+ // Write the # of pages that will follow
+ recoveryFile.writeInt(batch.size());
+ }
+
+ if (enableDiskSyncs) {
+ // Sync to make sure recovery buffer writes land on disk..
+ recoveryFile.getFD().sync();
+ writeFile.getFD().sync();
+ }
+ } finally {
+ synchronized (writes) {
+ for (PageWrite w : batch) {
+ // If there are no more pending writes, then remove it from
+ // the write cache.
+ if (w.isDone()) {
+ writes.remove(w.page.getPageId());
+ }
+ }
+ }
+
+ if (checkpointLatch != null) {
+ checkpointLatch.countDown();
+ }
+ }
+ }
private long recoveryFileSizeForPages(int pageCount) {
return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
@@ -1135,4 +1155,7 @@ public class PageFile {
return getMainPageFile();
}
+ public File getDirectory() {
+ return directory;
+ }
}
diff --git a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
index 73539a8..c6d0988 100644
--- a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
+++ b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
@@ -16,22 +16,11 @@
*/
package org.apache.kahadb.page;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
+import java.io.*;
+import java.util.*;
import org.apache.kahadb.page.PageFile.PageWrite;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.Marshaller;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
+import org.apache.kahadb.util.*;
/**
* The class used to read/update a PageFile object. Using a transaction allows you to
@@ -39,6 +28,11 @@ import org.apache.kahadb.util.SequenceSet;
*/
public class Transaction implements Iterable<Page> {
+
+ private RandomAccessFile tmpFile;
+ private File txfFile;
+ private int nextLocation = 0;
+
/**
* The PageOverflowIOException occurs when a page write is requested
* and it's data is larger than what would fit into a single page.
@@ -91,12 +85,16 @@ public class Transaction implements Iterable<Page> {
// If this transaction is updating stuff.. this is the tx of
private long writeTransactionId=-1;
// List of pages that this transaction has modified.
- private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>();
+ private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
// List of pages allocated in this transaction
private final SequenceSet allocateList = new SequenceSet();
// List of pages freed in this transaction
private final SequenceSet freeList = new SequenceSet();
+ private long maxTransactionSize = 10485760;
+
+ private long size = 0;
+
Transaction(PageFile pageFile) {
this.pageFile = pageFile;
}
@@ -650,6 +648,13 @@ public class Transaction implements Iterable<Page> {
allocateList.clear();
writes.clear();
writeTransactionId = -1;
+ if (tmpFile != null) {
+ tmpFile.close();
+ if (!getTempFile().delete()) {
+ throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile());
+ }
+ }
+ size = 0;
}
}
@@ -665,6 +670,13 @@ public class Transaction implements Iterable<Page> {
allocateList.clear();
writes.clear();
writeTransactionId = -1;
+ if (tmpFile != null) {
+ tmpFile.close();
+ if (getTempFile().delete()) {
+ throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile());
+ }
+ }
+ size = 0;
}
}
@@ -675,16 +687,36 @@ public class Transaction implements Iterable<Page> {
return writeTransactionId;
}
+
+ protected File getTempFile() {
+ if (txfFile == null) {
+ txfFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName(Long.toString(getWriteTransactionId())) + ".tmp");
+ }
+ return txfFile;
+ }
+
/**
* Queues up a page write that should get done when commit() gets called.
*/
@SuppressWarnings("unchecked")
private void write(final Page page, byte[] data) throws IOException {
Long key = page.getPageId();
- // TODO: if a large update transaction is in progress, we may want to move
- // all the current updates to a temp file so that we don't keep using
- // up memory.
- writes.put(key, new PageWrite(page, data));
+ size += data.length;
+
+ PageWrite write;
+ if (size > maxTransactionSize) {
+ if (tmpFile == null) {
+ tmpFile = new RandomAccessFile(getTempFile(), "rw");
+ }
+ int location = nextLocation;
+ // tmpFile.seek(nextLocation);
+ tmpFile.write(data);
+ nextLocation = location + data.length;
+ write = new PageWrite(page, location, data.length, getTempFile());
+ } else {
+ write = new PageWrite(page, data);
+ }
+ writes.put(key, write);
}
/**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment