Created
December 3, 2009 13:32
-
-
Save jabley/248157 to your computer and use it in GitHub Desktop.
ehcache 1.6.2 DiskStore patch for improved concurrency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: src/main/java/net/sf/ehcache/store/DiskStore.java | |
=================================================================== | |
--- src/main/java/net/sf/ehcache/store/DiskStore.java (revision 1486) | |
+++ src/main/java/net/sf/ehcache/store/DiskStore.java (working copy) | |
@@ -21,6 +21,7 @@ | |
import net.sf.ehcache.Ehcache; | |
import net.sf.ehcache.Element; | |
import net.sf.ehcache.Status; | |
+import net.sf.ehcache.concurrent.ConcurrencyUtil; | |
import net.sf.ehcache.config.CacheConfiguration; | |
import net.sf.ehcache.event.RegisteredEventListeners; | |
import net.sf.ehcache.util.MemoryEfficientByteArrayOutputStream; | |
@@ -85,11 +86,11 @@ | |
private final String name; | |
private boolean active; | |
- private RandomAccessFile randomAccessFile; | |
+ private RandomAccessFile[] randomAccessFiles; | |
- private ConcurrentHashMap diskElements = new ConcurrentHashMap(); | |
- private List freeSpace = Collections.synchronizedList(new ArrayList()); | |
- private volatile ConcurrentHashMap spool = new ConcurrentHashMap(); | |
+ private ConcurrentHashMap<Object, DiskElement> diskElements = new ConcurrentHashMap<Object, DiskElement>(); | |
+ private List<DiskElement> freeSpace = Collections.synchronizedList(new ArrayList<DiskElement>()); | |
+ private volatile ConcurrentHashMap<Object, Element> spool = new ConcurrentHashMap<Object, Element>(); | |
private Thread spoolAndExpiryThread; | |
@@ -137,6 +138,9 @@ | |
//The spoolAndExpiryThread keeps running until this is set to false; | |
private volatile boolean spoolAndExpiryThreadActive; | |
+ | |
+ /* Lock around spool field. */ | |
+ private final Object spoolLock = new Object(); | |
/** | |
* Creates a disk store. | |
@@ -159,7 +163,6 @@ | |
writeIndexFlag = new AtomicBoolean(false); | |
writeIndexFlagLock = new Object(); | |
- | |
try { | |
initialiseFiles(); | |
@@ -217,9 +220,25 @@ | |
} | |
// Open the data file as random access. The dataFile is created if necessary. | |
- randomAccessFile = new RandomAccessFile(dataFile, "rw"); | |
+ this.randomAccessFiles = allocateRandomAccessFiles(); | |
} | |
+ private RandomAccessFile[] allocateRandomAccessFiles() throws IOException { | |
+ int numberOfFiles = Runtime.getRuntime().availableProcessors(); | |
+ | |
+ while ((numberOfFiles & (numberOfFiles - 1)) != 0) { | |
+ ++numberOfFiles; | |
+ } | |
+ RandomAccessFile [] result = new RandomAccessFile[numberOfFiles]; | |
+ | |
+ int n = result.length; | |
+ for (int i = 0; i < n; ++i) { | |
+ result[i] = new RandomAccessFile(dataFile, "rw"); | |
+ } | |
+ | |
+ return result; | |
+ } | |
+ | |
private void deleteIndexIfNoData() { | |
boolean dataFileExists = dataFile.exists(); | |
boolean indexFileExists = indexFile.exists(); | |
@@ -245,16 +264,19 @@ | |
* | |
* @return The element | |
*/ | |
- public final synchronized Element get(final Object key) { | |
+ public final Element get(final Object key) { | |
try { | |
checkActive(); | |
// Check in the spool. Remove if present | |
Element element; | |
- element = (Element) spool.remove(key); | |
- if (element != null) { | |
- element.updateAccessStatistics(); | |
- return element; | |
+ /* Make sure that the spool isn't being emptied at the moment. */ | |
+ synchronized (spoolLock) { | |
+ element = (Element) spool.remove(key); | |
+ if (element != null) { | |
+ element.updateAccessStatistics(); | |
+ return element; | |
+ } | |
} | |
// Check if the element is on disk | |
@@ -287,9 +309,14 @@ | |
return diskElements.containsKey(key) || spool.containsKey(key); | |
} | |
+ private RandomAccessFile selectRandomAccessFile(Object objectKey) { | |
+ return this.randomAccessFiles[ConcurrencyUtil.selectLock(objectKey, this.randomAccessFiles.length)]; | |
+ } | |
+ | |
private Element loadElementFromDiskElement(DiskElement diskElement) throws IOException, ClassNotFoundException { | |
Element element; | |
final byte[] buffer; | |
+ RandomAccessFile randomAccessFile = selectRandomAccessFile(diskElement.getObjectKey()); | |
synchronized (randomAccessFile) { | |
// Load the element | |
randomAccessFile.seek(diskElement.position); | |
@@ -324,7 +351,7 @@ | |
* | |
* @return The element | |
*/ | |
- public final synchronized Element getQuiet(final Object key) { | |
+ public final Element getQuiet(final Object key) { | |
try { | |
checkActive(); | |
@@ -360,11 +387,11 @@ | |
* @return An Object[] of {@link Serializable} keys | |
*/ | |
public final synchronized Object[] getKeyArray() { | |
- Set elementKeySet; | |
+ Set<Object> elementKeySet; | |
elementKeySet = diskElements.keySet(); | |
- Set spoolKeySet; | |
+ Set<Object> spoolKeySet; | |
spoolKeySet = spool.keySet(); | |
- Set allKeysSet = new HashSet(elementKeySet.size() + spoolKeySet.size()); | |
+ Set<Object> allKeysSet = new HashSet<Object>(elementKeySet.size() + spoolKeySet.size()); | |
allKeysSet.addAll(elementKeySet); | |
allKeysSet.addAll(spoolKeySet); | |
return allKeysSet.toArray(); | |
@@ -506,12 +533,14 @@ | |
checkActive(); | |
// Ditch all the elements, and truncate the file | |
- spool = new ConcurrentHashMap(); | |
- diskElements = new ConcurrentHashMap(); | |
- freeSpace = Collections.synchronizedList(new ArrayList()); | |
+ spool = new ConcurrentHashMap<Object, Element>(); | |
+ diskElements = new ConcurrentHashMap<Object, DiskElement>(); | |
+ freeSpace = Collections.synchronizedList(new ArrayList<DiskElement>()); | |
totalSize = 0; | |
- synchronized (randomAccessFile) { | |
- randomAccessFile.setLength(0); | |
+ synchronized (randomAccessFiles) { | |
+ for (RandomAccessFile file : this.randomAccessFiles) { | |
+ file.setLength(0); | |
+ } | |
} | |
if (persistent) { | |
indexFile.delete(); | |
@@ -558,9 +587,11 @@ | |
spool.clear(); | |
diskElements.clear(); | |
freeSpace.clear(); | |
- synchronized (randomAccessFile) { | |
- if (randomAccessFile != null) { | |
- randomAccessFile.close(); | |
+ if (randomAccessFiles != null) { | |
+ synchronized (randomAccessFiles) { | |
+ for (RandomAccessFile file : randomAccessFiles) { | |
+ file.close(); | |
+ } | |
} | |
} | |
deleteFilesInAutoGeneratedDirectory(); | |
@@ -572,7 +603,7 @@ | |
LOG.log(Level.SEVERE, name + "Cache: Could not shut down disk cache. Initial cause was " + e.getMessage(), e); | |
} finally { | |
active = false; | |
- randomAccessFile = null; | |
+ randomAccessFiles = null; | |
//release reference to cache | |
cache = null; | |
@@ -697,29 +728,31 @@ | |
return; | |
} | |
- Map copyOfSpool = swapSpoolReference(); | |
+ /* Mutation of spool and DiskElements should be atomic. */ | |
+ synchronized (spoolLock) { | |
+ Map<Object, Element> copyOfSpool = swapSpoolReference(); | |
- //does not guarantee insertion order | |
- Iterator valuesIterator = copyOfSpool.values().iterator(); | |
- while (valuesIterator.hasNext()) { | |
- writeOrReplaceEntry(valuesIterator.next()); | |
- valuesIterator.remove(); | |
+ //does not guarantee insertion order | |
+ Iterator<Element> valuesIterator = copyOfSpool.values().iterator(); | |
+ while (valuesIterator.hasNext()) { | |
+ writeOrReplaceEntry(valuesIterator.next()); | |
+ valuesIterator.remove(); | |
+ } | |
} | |
} | |
- private Map swapSpoolReference() { | |
+ private Map<Object, Element> swapSpoolReference() { | |
Map copyOfSpool = null; | |
// Copy the reference of the old spool, not the contents. Avoid potential spike in memory usage | |
copyOfSpool = spool; | |
// use a new map making the reference swap above SAFE | |
- spool = new ConcurrentHashMap(); | |
+ spool = new ConcurrentHashMap<Object, Element>(); | |
return copyOfSpool; | |
} | |
- private void writeOrReplaceEntry(Object object) throws IOException { | |
- Element element = (Element) object; | |
+ private void writeOrReplaceEntry(Element element) throws IOException { | |
if (element == null) { | |
return; | |
} | |
@@ -744,6 +777,7 @@ | |
DiskElement diskElement = checkForFreeBlock(bufferLength); | |
// Write the record | |
+ RandomAccessFile randomAccessFile = selectRandomAccessFile(key); | |
synchronized (randomAccessFile) { | |
randomAccessFile.seek(diskElement.position); | |
randomAccessFile.write(buffer.toByteArray(), 0, bufferLength); | |
@@ -825,8 +859,8 @@ | |
DiskElement diskElement = findFreeBlock(bufferLength); | |
if (diskElement == null) { | |
diskElement = new DiskElement(); | |
- synchronized (randomAccessFile) { | |
- diskElement.position = randomAccessFile.length(); | |
+ synchronized (randomAccessFiles[0]) { | |
+ diskElement.position = randomAccessFiles[0].length(); | |
} | |
diskElement.blockSize = bufferLength; | |
} | |
@@ -876,14 +910,14 @@ | |
fin = new FileInputStream(indexFile); | |
objectInputStream = new ObjectInputStream(fin); | |
- Map diskElementsMap = (Map)objectInputStream.readObject(); | |
+ Map<Object, DiskElement> diskElementsMap = (Map)objectInputStream.readObject(); | |
if (diskElementsMap instanceof ConcurrentHashMap) { | |
diskElements = (ConcurrentHashMap)diskElementsMap; | |
} else { | |
diskElements = new ConcurrentHashMap(diskElementsMap); | |
} | |
- freeSpace = (List) objectInputStream.readObject(); | |
+ freeSpace = (List<DiskElement>) objectInputStream.readObject(); | |
success = true; | |
} catch (StreamCorruptedException e) { | |
LOG.log(Level.SEVERE, "Corrupt index file. Creating new index."); | |
@@ -948,8 +982,8 @@ | |
final long now = System.currentTimeMillis(); | |
// Clean up the spool | |
- for (Iterator iterator = spool.values().iterator(); iterator.hasNext();) { | |
- final Element element = (Element) iterator.next(); | |
+ for (Iterator<Element> iterator = spool.values().iterator(); iterator.hasNext();) { | |
+ final Element element = iterator.next(); | |
if (element.isExpired()) { | |
// An expired element | |
if (LOG.isLoggable(Level.FINE)) { | |
@@ -963,8 +997,8 @@ | |
Element element = null; | |
RegisteredEventListeners listeners = cache.getCacheEventNotificationService(); | |
// Clean up disk elements | |
- for (Iterator iterator = diskElements.entrySet().iterator(); iterator.hasNext();) { | |
- final Map.Entry entry = (Map.Entry) iterator.next(); | |
+ for (Iterator<Map.Entry<Object, DiskElement>> iterator = diskElements.entrySet().iterator(); iterator.hasNext();) { | |
+ final Map.Entry<Object, DiskElement> entry = iterator.next(); | |
final DiskElement diskElement = (DiskElement) entry.getValue(); | |
if (now >= diskElement.expiryTime) { | |
@@ -1045,7 +1079,7 @@ | |
* Copies of expiryTime and hitcount are held here as a performance optimisation, so | |
* that we do not need to load the data from Disk to get this often used information. | |
*/ | |
- private static final class DiskElement implements Serializable { | |
+ public static final class DiskElement implements Serializable { | |
private static final long serialVersionUID = -717310932566592289L; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment