Created
June 20, 2011 16:21
-
-
Save dejanb/1035920 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java | |
index 6a83016..8a39703 100644 | |
--- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java | |
+++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java | |
@@ -19,6 +19,7 @@ package org.apache.activemq.usecases; | |
import java.lang.management.ManagementFactory; | |
import javax.jms.Connection; | |
+import javax.jms.MessageProducer; | |
import javax.jms.Session; | |
import javax.management.MBeanServer; | |
import javax.management.ObjectName; | |
@@ -43,7 +44,15 @@ public class DurableUnsubscribeTest extends org.apache.activemq.TestSupport { | |
Destination d = broker.getDestination(topic); | |
assertEquals("Subscription is missing.", 1, d.getConsumers().size()); | |
+ | |
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
+ MessageProducer producer = session.createProducer(topic); | |
+ for (int i = 0; i < 1000; i++) { | |
+ producer.send(session.createTextMessage("text")); | |
+ } | |
+ | |
+ Thread.sleep(1000); | |
+ | |
session.unsubscribe("SubsId"); | |
session.close(); | |
@@ -92,7 +101,7 @@ public class DurableUnsubscribeTest extends org.apache.activemq.TestSupport { | |
private void createBroker() throws Exception { | |
broker = BrokerFactory.createBroker("broker:(vm://localhost)"); | |
- broker.setPersistent(false); | |
+ //broker.setPersistent(false); | |
broker.setUseJmx(true); | |
broker.setBrokerName(getName()); | |
broker.start(); | |
diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java | |
index 9334318..d19b8fe 100644 | |
--- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java | |
+++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java | |
@@ -138,17 +138,35 @@ public class PageFile { | |
Page page; | |
byte[] current; | |
byte[] diskBound; | |
+ int currentLocation = -1; | |
+ int diskBoundLocation = -1; | |
+ File tmpFile; | |
+ int length; | |
public PageWrite(Page page, byte[] data) { | |
this.page=page; | |
current=data; | |
} | |
+ | |
+ public PageWrite(Page page, int currentLocation, int length, File tmpFile) { | |
+ this.page = page; | |
+ this.currentLocation = currentLocation; | |
+ this.tmpFile = tmpFile; | |
+ this.length = length; | |
+ } | |
public void setCurrent(Page page, byte[] data) { | |
this.page=page; | |
current=data; | |
} | |
+ public void setCurrentLocation(Page page, int location, int length) { | |
+ this.page = page; | |
+ this.currentLocation = location; | |
+ this.length = length; | |
+ this.current = null; | |
+ } | |
+ | |
@Override | |
public String toString() { | |
return "[PageWrite:"+page.getPageId()+"]"; | |
@@ -158,22 +176,40 @@ public class PageFile { | |
public Page getPage() { | |
return page; | |
} | |
+ | |
+ public byte[] getDiskBound() throws IOException { | |
+ if (diskBoundLocation != -1) { | |
+ diskBound = new byte[length]; | |
+ RandomAccessFile file = new RandomAccessFile(tmpFile, "r"); | |
+ file.seek(diskBoundLocation); | |
+ int readNum = file.read(diskBound); | |
+ file.close(); | |
+ diskBoundLocation = -1; | |
+ } | |
+ return diskBound; | |
+ } | |
void begin() { | |
- diskBound = current; | |
- current = null; | |
+ if (currentLocation != -1) { | |
+ diskBoundLocation = currentLocation; | |
+ currentLocation = -1; | |
+ } else { | |
+ diskBound = current; | |
+ current = null; | |
+ } | |
} | |
/** | |
* @return true if there is no pending writes to do. | |
*/ | |
boolean done() { | |
+ diskBoundLocation = -1; | |
diskBound=null; | |
- return current == null; | |
+ return current == null || currentLocation == -1; | |
} | |
boolean isDone() { | |
- return diskBound == null && current == null; | |
+ return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1; | |
} | |
} | |
@@ -470,7 +506,7 @@ public class PageFile { | |
return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX); | |
} | |
- private long toOffset(long pageId) { | |
+ public long toOffset(long pageId) { | |
return PAGE_FILE_HEADER_SIZE+(pageId*pageSize); | |
} | |
@@ -823,6 +859,8 @@ public class PageFile { | |
} | |
} | |
+ boolean longTx = false; | |
+ | |
for (Map.Entry<Long, PageWrite> entry : updates) { | |
Long key = entry.getKey(); | |
PageWrite value = entry.getValue(); | |
@@ -830,12 +868,19 @@ public class PageFile { | |
if( write==null ) { | |
writes.put(key, value); | |
} else { | |
- write.setCurrent(value.page, value.current); | |
+ if (value.currentLocation != -1) { | |
+ write.setCurrentLocation(value.page, value.currentLocation, value.length); | |
+ write.tmpFile = value.tmpFile; | |
+ longTx = true; | |
+ } else { | |
+ write.setCurrent(value.page, value.current); | |
+ } | |
} | |
} | |
// Once we start approaching capacity, notify the writer to start writing | |
- if( canStartWriteBatch() ) { | |
+ if( longTx || canStartWriteBatch() ) { | |
+ | |
if( enabledWriteThread ) { | |
writes.notify(); | |
} else { | |
@@ -919,115 +964,90 @@ public class PageFile { | |
} | |
} | |
- /** | |
- * | |
- * @return true if there are still pending writes to do. | |
- * @throws InterruptedException | |
- * @throws IOException | |
- */ | |
- private void writeBatch() throws IOException { | |
- | |
- CountDownLatch checkpointLatch; | |
- ArrayList<PageWrite> batch; | |
- synchronized( writes ) { | |
+ private void writeBatch() throws IOException { | |
+ | |
+ CountDownLatch checkpointLatch; | |
+ ArrayList<PageWrite> batch; | |
+ synchronized( writes ) { | |
// If there is not enough to write, wait for a notification... | |
batch = new ArrayList<PageWrite>(writes.size()); | |
// build a write batch from the current write cache. | |
for (PageWrite write : writes.values()) { | |
batch.add(write); | |
- // Move the current write to the diskBound write, this lets folks update the | |
+ // Move the current write to the diskBound write, this lets folks update the | |
// page again without blocking for this write. | |
write.begin(); | |
- if (write.diskBound == null) { | |
+ if (write.diskBound == null && write.diskBoundLocation == -1) { | |
batch.remove(write); | |
} | |
} | |
- // Grab on to the existing checkpoint latch cause once we do this write we can | |
+ // Grab on to the existing checkpoint latch cause once we do this write we can | |
// release the folks that were waiting for those writes to hit disk. | |
checkpointLatch = this.checkpointLatch; | |
this.checkpointLatch=null; | |
- } | |
- | |
- try { | |
- if (enableRecoveryFile) { | |
- | |
- // Using Adler-32 instead of CRC-32 because it's much faster and | |
- // it's | |
- // weakness for short messages with few hundred bytes is not a | |
- // factor in this case since we know | |
- // our write batches are going to much larger. | |
- Checksum checksum = new Adler32(); | |
- for (PageWrite w : batch) { | |
- try { | |
- checksum.update(w.diskBound, 0, pageSize); | |
- } catch (Throwable t) { | |
- throw IOExceptionSupport.create( | |
- "Cannot create recovery file. Reason: " + t, t); | |
- } | |
- } | |
- | |
- // Can we shrink the recovery buffer?? | |
- if (recoveryPageCount > recoveryFileMaxPageCount) { | |
- int t = Math.max(recoveryFileMinPageCount, batch.size()); | |
- recoveryFile.setLength(recoveryFileSizeForPages(t)); | |
- } | |
- | |
- // Record the page writes in the recovery buffer. | |
- recoveryFile.seek(0); | |
- // Store the next tx id... | |
- recoveryFile.writeLong(nextTxid.get()); | |
- // Store the checksum for thw write batch so that on recovery we | |
- // know if we have a consistent | |
- // write batch on disk. | |
- recoveryFile.writeLong(checksum.getValue()); | |
- // Write the # of pages that will follow | |
- recoveryFile.writeInt(batch.size()); | |
- | |
- // Write the pages. | |
- recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); | |
- | |
- for (PageWrite w : batch) { | |
- recoveryFile.writeLong(w.page.getPageId()); | |
- recoveryFile.write(w.diskBound, 0, pageSize); | |
- } | |
- | |
- if (enableDiskSyncs) { | |
- // Sync to make sure recovery buffer writes land on disk.. | |
- recoveryFile.getFD().sync(); | |
- } | |
- | |
- recoveryPageCount = batch.size(); | |
- } | |
- | |
- for (PageWrite w : batch) { | |
- writeFile.seek(toOffset(w.page.getPageId())); | |
- writeFile.write(w.diskBound, 0, pageSize); | |
- w.done(); | |
- } | |
- | |
- // Sync again | |
- if (enableDiskSyncs) { | |
- writeFile.getFD().sync(); | |
- } | |
- | |
- } finally { | |
- synchronized (writes) { | |
- for (PageWrite w : batch) { | |
- // If there are no more pending writes, then remove it from | |
- // the write cache. | |
- if (w.isDone()) { | |
- writes.remove(w.page.getPageId()); | |
- } | |
- } | |
- } | |
- | |
- if( checkpointLatch!=null ) { | |
- checkpointLatch.countDown(); | |
- } | |
- } | |
- } | |
+ } | |
+ | |
+ Checksum checksum = new Adler32(); | |
+ recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); | |
+ for (PageWrite w : batch) { | |
+ if (enableRecoveryFile) { | |
+ try { | |
+ checksum.update(w.getDiskBound(), 0, pageSize); | |
+ } catch (Throwable t) { | |
+ throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); | |
+ } | |
+ recoveryFile.writeLong(w.page.getPageId()); | |
+ recoveryFile.write(w.getDiskBound(), 0, pageSize); | |
+ } | |
+ | |
+ writeFile.seek(toOffset(w.page.getPageId())); | |
+ writeFile.write(w.getDiskBound(), 0, pageSize); | |
+ w.done(); | |
+ } | |
+ | |
+ try { | |
+ if (enableRecoveryFile) { | |
+ // Can we shrink the recovery buffer?? | |
+ if (recoveryPageCount > recoveryFileMaxPageCount) { | |
+ int t = Math.max(recoveryFileMinPageCount, batch.size()); | |
+ recoveryFile.setLength(recoveryFileSizeForPages(t)); | |
+ } | |
+ | |
+ // Record the page writes in the recovery buffer. | |
+ recoveryFile.seek(0); | |
+ // Store the next tx id... | |
+ recoveryFile.writeLong(nextTxid.get()); | |
+ // Store the checksum for thw write batch so that on recovery we | |
+ // know if we have a consistent | |
+ // write batch on disk. | |
+ recoveryFile.writeLong(checksum.getValue()); | |
+ // Write the # of pages that will follow | |
+ recoveryFile.writeInt(batch.size()); | |
+ } | |
+ | |
+ if (enableDiskSyncs) { | |
+ // Sync to make sure recovery buffer writes land on disk.. | |
+ recoveryFile.getFD().sync(); | |
+ writeFile.getFD().sync(); | |
+ } | |
+ } finally { | |
+ synchronized (writes) { | |
+ for (PageWrite w : batch) { | |
+ // If there are no more pending writes, then remove it from | |
+ // the write cache. | |
+ if (w.isDone()) { | |
+ writes.remove(w.page.getPageId()); | |
+ } | |
+ } | |
+ } | |
+ | |
+ if (checkpointLatch != null) { | |
+ checkpointLatch.countDown(); | |
+ } | |
+ } | |
+ } | |
private long recoveryFileSizeForPages(int pageCount) { | |
return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount); | |
@@ -1135,4 +1155,7 @@ public class PageFile { | |
return getMainPageFile(); | |
} | |
+ public File getDirectory() { | |
+ return directory; | |
+ } | |
} | |
diff --git a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java | |
index 73539a8..c6d0988 100644 | |
--- a/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java | |
+++ b/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java | |
@@ -16,22 +16,11 @@ | |
*/ | |
package org.apache.kahadb.page; | |
-import java.io.DataInputStream; | |
-import java.io.EOFException; | |
-import java.io.IOException; | |
-import java.io.InputStream; | |
-import java.io.OutputStream; | |
-import java.util.HashMap; | |
-import java.util.Iterator; | |
-import java.util.NoSuchElementException; | |
+import java.io.*; | |
+import java.util.*; | |
import org.apache.kahadb.page.PageFile.PageWrite; | |
-import org.apache.kahadb.util.ByteSequence; | |
-import org.apache.kahadb.util.DataByteArrayInputStream; | |
-import org.apache.kahadb.util.DataByteArrayOutputStream; | |
-import org.apache.kahadb.util.Marshaller; | |
-import org.apache.kahadb.util.Sequence; | |
-import org.apache.kahadb.util.SequenceSet; | |
+import org.apache.kahadb.util.*; | |
/** | |
* The class used to read/update a PageFile object. Using a transaction allows you to | |
@@ -39,6 +28,11 @@ import org.apache.kahadb.util.SequenceSet; | |
*/ | |
public class Transaction implements Iterable<Page> { | |
+ | |
+ private RandomAccessFile tmpFile; | |
+ private File txfFile; | |
+ private int nextLocation = 0; | |
+ | |
/** | |
* The PageOverflowIOException occurs when a page write is requested | |
* and it's data is larger than what would fit into a single page. | |
@@ -91,12 +85,16 @@ public class Transaction implements Iterable<Page> { | |
// If this transaction is updating stuff.. this is the tx of | |
private long writeTransactionId=-1; | |
// List of pages that this transaction has modified. | |
- private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>(); | |
+ private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>(); | |
// List of pages allocated in this transaction | |
private final SequenceSet allocateList = new SequenceSet(); | |
// List of pages freed in this transaction | |
private final SequenceSet freeList = new SequenceSet(); | |
+ private long maxTransactionSize = 10485760; | |
+ | |
+ private long size = 0; | |
+ | |
Transaction(PageFile pageFile) { | |
this.pageFile = pageFile; | |
} | |
@@ -650,6 +648,13 @@ public class Transaction implements Iterable<Page> { | |
allocateList.clear(); | |
writes.clear(); | |
writeTransactionId = -1; | |
+ if (tmpFile != null) { | |
+ tmpFile.close(); | |
+ if (!getTempFile().delete()) { | |
+ throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile()); | |
+ } | |
+ } | |
+ size = 0; | |
} | |
} | |
@@ -665,6 +670,13 @@ public class Transaction implements Iterable<Page> { | |
allocateList.clear(); | |
writes.clear(); | |
writeTransactionId = -1; | |
+ if (tmpFile != null) { | |
+ tmpFile.close(); | |
+ if (getTempFile().delete()) { | |
+ throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile()); | |
+ } | |
+ } | |
+ size = 0; | |
} | |
} | |
@@ -675,16 +687,36 @@ public class Transaction implements Iterable<Page> { | |
return writeTransactionId; | |
} | |
+ | |
+ protected File getTempFile() { | |
+ if (txfFile == null) { | |
+ txfFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName(Long.toString(getWriteTransactionId())) + ".tmp"); | |
+ } | |
+ return txfFile; | |
+ } | |
+ | |
/** | |
* Queues up a page write that should get done when commit() gets called. | |
*/ | |
@SuppressWarnings("unchecked") | |
private void write(final Page page, byte[] data) throws IOException { | |
Long key = page.getPageId(); | |
- // TODO: if a large update transaction is in progress, we may want to move | |
- // all the current updates to a temp file so that we don't keep using | |
- // up memory. | |
- writes.put(key, new PageWrite(page, data)); | |
+ size += data.length; | |
+ | |
+ PageWrite write; | |
+ if (size > maxTransactionSize) { | |
+ if (tmpFile == null) { | |
+ tmpFile = new RandomAccessFile(getTempFile(), "rw"); | |
+ } | |
+ int location = nextLocation; | |
+ // tmpFile.seek(nextLocation); | |
+ tmpFile.write(data); | |
+ nextLocation = location + data.length; | |
+ write = new PageWrite(page, location, data.length, getTempFile()); | |
+ } else { | |
+ write = new PageWrite(page, data); | |
+ } | |
+ writes.put(key, write); | |
} | |
/** |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment