Created
December 3, 2009 13:30
-
-
Save jabley/248153 to your computer and use it in GitHub Desktop.
ehcache trunk DiskStore patch for improved concurrency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: src/main/java/net/sf/ehcache/store/DiskStore.java | |
=================================================================== | |
--- src/main/java/net/sf/ehcache/store/DiskStore.java (revision 1470) | |
+++ src/main/java/net/sf/ehcache/store/DiskStore.java (working copy) | |
@@ -46,6 +46,7 @@ | |
import net.sf.ehcache.Ehcache; | |
import net.sf.ehcache.Element; | |
import net.sf.ehcache.Status; | |
+import net.sf.ehcache.concurrent.ConcurrencyUtil; | |
import net.sf.ehcache.config.CacheConfiguration; | |
import net.sf.ehcache.event.RegisteredEventListeners; | |
import net.sf.ehcache.util.MemoryEfficientByteArrayOutputStream; | |
@@ -86,7 +87,7 @@ | |
private final String name; | |
private boolean active; | |
- private RandomAccessFile randomAccessFile; | |
+ private RandomAccessFile[] randomAccessFiles; | |
private ConcurrentHashMap diskElements = new ConcurrentHashMap(); | |
private List freeSpace = Collections.synchronizedList(new ArrayList()); | |
@@ -138,6 +139,9 @@ | |
//The spoolAndExpiryThread keeps running until this is set to false; | |
private volatile boolean spoolAndExpiryThreadActive; | |
+ | |
+ /* Lock around spool field. */ | |
+ private final Object spoolLock = new Object(); | |
/** | |
* Creates a disk store. | |
@@ -160,7 +164,6 @@ | |
writeIndexFlag = new AtomicBoolean(false); | |
writeIndexFlagLock = new Object(); | |
- | |
try { | |
initialiseFiles(); | |
@@ -225,9 +228,25 @@ | |
} | |
// Open the data file as random access. The dataFile is created if necessary. | |
- randomAccessFile = new RandomAccessFile(dataFile, "rw"); | |
+ this.randomAccessFiles = allocateRandomAccessFiles(); | |
} | |
+ private RandomAccessFile[] allocateRandomAccessFiles() throws IOException { | |
+ int numberOfFiles = Runtime.getRuntime().availableProcessors(); | |
+ | |
+ while ((numberOfFiles & (numberOfFiles - 1)) != 0) { | |
+ ++numberOfFiles; | |
+ } | |
+ RandomAccessFile [] result = new RandomAccessFile[numberOfFiles]; | |
+ | |
+ int n = result.length; | |
+ for (int i = 0; i < n; ++i) { | |
+ result[i] = new RandomAccessFile(dataFile, "rw"); | |
+ } | |
+ | |
+ return result; | |
+ } | |
+ | |
private void deleteIndexIfCorrupt() { | |
boolean dataFileExists = dataFile.exists(); | |
boolean indexFileExists = indexFile.exists(); | |
@@ -264,17 +283,21 @@ | |
* | |
* @return The element | |
*/ | |
- public final synchronized Element get(final Object key) { | |
+ public final Element get(final Object key) { | |
try { | |
checkActive(); | |
// Check in the spool. Remove if present | |
Element element; | |
- element = (Element) spool.remove(key); | |
- if (element != null) { | |
- return element; | |
+ | |
+ /* Make sure that the spool isn't being emptied at the moment. */ | |
+ synchronized (spoolLock) { | |
+ element = (Element) spool.remove(key); | |
+ if (element != null) { | |
+ return element; | |
+ } | |
} | |
- | |
+ | |
// Check if the element is on disk | |
final DiskElement diskElement = (DiskElement) diskElements.get(key); | |
if (diskElement == null) { | |
@@ -302,9 +325,14 @@ | |
return diskElements.containsKey(key) || spool.containsKey(key); | |
} | |
+ private RandomAccessFile selectRandomAccessFile(Object objectKey) { | |
+ return this.randomAccessFiles[ConcurrencyUtil.selectLock(objectKey, this.randomAccessFiles.length)]; | |
+ } | |
+ | |
private Element loadElementFromDiskElement(DiskElement diskElement) throws IOException, ClassNotFoundException { | |
Element element; | |
final byte[] buffer; | |
+ RandomAccessFile randomAccessFile = selectRandomAccessFile(diskElement.getObjectKey()); | |
synchronized (randomAccessFile) { | |
// Load the element | |
randomAccessFile.seek(diskElement.position); | |
@@ -341,7 +369,7 @@ | |
* @return The element | |
* @see #get(Object) | |
*/ | |
- public final synchronized Element getQuiet(final Object key) { | |
+ public final Element getQuiet(final Object key) { | |
return get(key); | |
} | |
@@ -508,8 +536,10 @@ | |
diskElements = new ConcurrentHashMap(); | |
freeSpace = Collections.synchronizedList(new ArrayList()); | |
totalSize = 0; | |
- synchronized (randomAccessFile) { | |
- randomAccessFile.setLength(0); | |
+ synchronized (randomAccessFiles) { | |
+ for (RandomAccessFile file : this.randomAccessFiles) { | |
+ file.setLength(0); | |
+ } | |
} | |
if (persistent) { | |
indexFile.delete(); | |
@@ -556,9 +586,11 @@ | |
spool.clear(); | |
diskElements.clear(); | |
freeSpace.clear(); | |
- if (randomAccessFile != null) { | |
- synchronized (randomAccessFile) { | |
- randomAccessFile.close(); | |
+ if (randomAccessFiles != null) { | |
+ synchronized (randomAccessFiles) { | |
+ for (RandomAccessFile file : randomAccessFiles) { | |
+ file.close(); | |
+ } | |
} | |
} | |
deleteFilesInAutoGeneratedDirectory(); | |
@@ -570,7 +602,7 @@ | |
LOG.error(name + "Cache: Could not shut down disk cache. Initial cause was " + e.getMessage(), e); | |
} finally { | |
active = false; | |
- randomAccessFile = null; | |
+ randomAccessFiles = null; | |
//release reference to cache | |
cache = null; | |
@@ -695,13 +727,16 @@ | |
return; | |
} | |
- Map copyOfSpool = swapSpoolReference(); | |
+ /* Mutation of spool and DiskElements should be atomic. */ | |
+ synchronized (spoolLock) { | |
+ Map copyOfSpool = swapSpoolReference(); | |
- //does not guarantee insertion order | |
- Iterator valuesIterator = copyOfSpool.values().iterator(); | |
- while (valuesIterator.hasNext()) { | |
- writeOrReplaceEntry(valuesIterator.next()); | |
- valuesIterator.remove(); | |
+ //does not guarantee insertion order | |
+ Iterator valuesIterator = copyOfSpool.values().iterator(); | |
+ while (valuesIterator.hasNext()) { | |
+ writeOrReplaceEntry(valuesIterator.next()); | |
+ valuesIterator.remove(); | |
+ } | |
} | |
} | |
@@ -742,6 +777,7 @@ | |
DiskElement diskElement = checkForFreeBlock(bufferLength); | |
// Write the record | |
+ RandomAccessFile randomAccessFile = selectRandomAccessFile(key); | |
synchronized (randomAccessFile) { | |
randomAccessFile.seek(diskElement.position); | |
randomAccessFile.write(buffer.toByteArray(), 0, bufferLength); | |
@@ -823,8 +859,8 @@ | |
DiskElement diskElement = findFreeBlock(bufferLength); | |
if (diskElement == null) { | |
diskElement = new DiskElement(); | |
- synchronized (randomAccessFile) { | |
- diskElement.position = randomAccessFile.length(); | |
+ synchronized (randomAccessFiles[0]) { | |
+ diskElement.position = randomAccessFiles[0].length(); | |
} | |
diskElement.blockSize = bufferLength; | |
} | |
@@ -1038,7 +1074,7 @@ | |
* Copies of expiryTime and hitcount are held here as a performance optimisation, so | |
* that we do not need to load the data from Disk to get this often used information. | |
*/ | |
- private static final class DiskElement implements Serializable { | |
+ public static final class DiskElement implements Serializable { | |
private static final long serialVersionUID = -717310932566592289L; | |
@@ -1315,4 +1351,5 @@ | |
public boolean isCacheCoherent() { | |
return false; | |
} | |
+ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment