Skip to content

Instantly share code, notes, and snippets.

@jabley
Created December 3, 2009 13:32
Show Gist options
  • Save jabley/248157 to your computer and use it in GitHub Desktop.
Save jabley/248157 to your computer and use it in GitHub Desktop.
ehcache 1.6.2 DiskStore patch for improved concurrency
Index: src/main/java/net/sf/ehcache/store/DiskStore.java
===================================================================
--- src/main/java/net/sf/ehcache/store/DiskStore.java (revision 1486)
+++ src/main/java/net/sf/ehcache/store/DiskStore.java (working copy)
@@ -21,6 +21,7 @@
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import net.sf.ehcache.Status;
+import net.sf.ehcache.concurrent.ConcurrencyUtil;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.event.RegisteredEventListeners;
import net.sf.ehcache.util.MemoryEfficientByteArrayOutputStream;
@@ -85,11 +86,11 @@
private final String name;
private boolean active;
- private RandomAccessFile randomAccessFile;
+ private RandomAccessFile[] randomAccessFiles;
- private ConcurrentHashMap diskElements = new ConcurrentHashMap();
- private List freeSpace = Collections.synchronizedList(new ArrayList());
- private volatile ConcurrentHashMap spool = new ConcurrentHashMap();
+ private ConcurrentHashMap<Object, DiskElement> diskElements = new ConcurrentHashMap<Object, DiskElement>();
+ private List<DiskElement> freeSpace = Collections.synchronizedList(new ArrayList<DiskElement>());
+ private volatile ConcurrentHashMap<Object, Element> spool = new ConcurrentHashMap<Object, Element>();
private Thread spoolAndExpiryThread;
@@ -137,6 +138,9 @@
//The spoolAndExpiryThread keeps running until this is set to false;
private volatile boolean spoolAndExpiryThreadActive;
+
+ /* Lock around spool field. */
+ private final Object spoolLock = new Object();
/**
* Creates a disk store.
@@ -159,7 +163,6 @@
writeIndexFlag = new AtomicBoolean(false);
writeIndexFlagLock = new Object();
-
try {
initialiseFiles();
@@ -217,9 +220,25 @@
}
// Open the data file as random access. The dataFile is created if necessary.
- randomAccessFile = new RandomAccessFile(dataFile, "rw");
+ this.randomAccessFiles = allocateRandomAccessFiles();
}
+ private RandomAccessFile[] allocateRandomAccessFiles() throws IOException {
+ int numberOfFiles = Runtime.getRuntime().availableProcessors();
+
+ while ((numberOfFiles & (numberOfFiles - 1)) != 0) {
+ ++numberOfFiles;
+ }
+ RandomAccessFile [] result = new RandomAccessFile[numberOfFiles];
+
+ int n = result.length;
+ for (int i = 0; i < n; ++i) {
+ result[i] = new RandomAccessFile(dataFile, "rw");
+ }
+
+ return result;
+ }
+
private void deleteIndexIfNoData() {
boolean dataFileExists = dataFile.exists();
boolean indexFileExists = indexFile.exists();
@@ -245,16 +264,19 @@
*
* @return The element
*/
- public final synchronized Element get(final Object key) {
+ public final Element get(final Object key) {
try {
checkActive();
// Check in the spool. Remove if present
Element element;
- element = (Element) spool.remove(key);
- if (element != null) {
- element.updateAccessStatistics();
- return element;
+ /* Make sure that the spool isn't being emptied at the moment. */
+ synchronized (spoolLock) {
+ element = (Element) spool.remove(key);
+ if (element != null) {
+ element.updateAccessStatistics();
+ return element;
+ }
}
// Check if the element is on disk
@@ -287,9 +309,14 @@
return diskElements.containsKey(key) || spool.containsKey(key);
}
+ private RandomAccessFile selectRandomAccessFile(Object objectKey) {
+ return this.randomAccessFiles[ConcurrencyUtil.selectLock(objectKey, this.randomAccessFiles.length)];
+ }
+
private Element loadElementFromDiskElement(DiskElement diskElement) throws IOException, ClassNotFoundException {
Element element;
final byte[] buffer;
+ RandomAccessFile randomAccessFile = selectRandomAccessFile(diskElement.getObjectKey());
synchronized (randomAccessFile) {
// Load the element
randomAccessFile.seek(diskElement.position);
@@ -324,7 +351,7 @@
*
* @return The element
*/
- public final synchronized Element getQuiet(final Object key) {
+ public final Element getQuiet(final Object key) {
try {
checkActive();
@@ -360,11 +387,11 @@
* @return An Object[] of {@link Serializable} keys
*/
public final synchronized Object[] getKeyArray() {
- Set elementKeySet;
+ Set<Object> elementKeySet;
elementKeySet = diskElements.keySet();
- Set spoolKeySet;
+ Set<Object> spoolKeySet;
spoolKeySet = spool.keySet();
- Set allKeysSet = new HashSet(elementKeySet.size() + spoolKeySet.size());
+ Set<Object> allKeysSet = new HashSet<Object>(elementKeySet.size() + spoolKeySet.size());
allKeysSet.addAll(elementKeySet);
allKeysSet.addAll(spoolKeySet);
return allKeysSet.toArray();
@@ -506,12 +533,14 @@
checkActive();
// Ditch all the elements, and truncate the file
- spool = new ConcurrentHashMap();
- diskElements = new ConcurrentHashMap();
- freeSpace = Collections.synchronizedList(new ArrayList());
+ spool = new ConcurrentHashMap<Object, Element>();
+ diskElements = new ConcurrentHashMap<Object, DiskElement>();
+ freeSpace = Collections.synchronizedList(new ArrayList<DiskElement>());
totalSize = 0;
- synchronized (randomAccessFile) {
- randomAccessFile.setLength(0);
+ synchronized (randomAccessFiles) {
+ for (RandomAccessFile file : this.randomAccessFiles) {
+ file.setLength(0);
+ }
}
if (persistent) {
indexFile.delete();
@@ -558,9 +587,11 @@
spool.clear();
diskElements.clear();
freeSpace.clear();
- synchronized (randomAccessFile) {
- if (randomAccessFile != null) {
- randomAccessFile.close();
+ if (randomAccessFiles != null) {
+ synchronized (randomAccessFiles) {
+ for (RandomAccessFile file : randomAccessFiles) {
+ file.close();
+ }
}
}
deleteFilesInAutoGeneratedDirectory();
@@ -572,7 +603,7 @@
LOG.log(Level.SEVERE, name + "Cache: Could not shut down disk cache. Initial cause was " + e.getMessage(), e);
} finally {
active = false;
- randomAccessFile = null;
+ randomAccessFiles = null;
//release reference to cache
cache = null;
@@ -697,29 +728,31 @@
return;
}
- Map copyOfSpool = swapSpoolReference();
+ /* Mutation of spool and DiskElements should be atomic. */
+ synchronized (spoolLock) {
+ Map<Object, Element> copyOfSpool = swapSpoolReference();
- //does not guarantee insertion order
- Iterator valuesIterator = copyOfSpool.values().iterator();
- while (valuesIterator.hasNext()) {
- writeOrReplaceEntry(valuesIterator.next());
- valuesIterator.remove();
+ //does not guarantee insertion order
+ Iterator<Element> valuesIterator = copyOfSpool.values().iterator();
+ while (valuesIterator.hasNext()) {
+ writeOrReplaceEntry(valuesIterator.next());
+ valuesIterator.remove();
+ }
}
}
- private Map swapSpoolReference() {
+ private Map<Object, Element> swapSpoolReference() {
Map copyOfSpool = null;
// Copy the reference of the old spool, not the contents. Avoid potential spike in memory usage
copyOfSpool = spool;
// use a new map making the reference swap above SAFE
- spool = new ConcurrentHashMap();
+ spool = new ConcurrentHashMap<Object, Element>();
return copyOfSpool;
}
- private void writeOrReplaceEntry(Object object) throws IOException {
- Element element = (Element) object;
+ private void writeOrReplaceEntry(Element element) throws IOException {
if (element == null) {
return;
}
@@ -744,6 +777,7 @@
DiskElement diskElement = checkForFreeBlock(bufferLength);
// Write the record
+ RandomAccessFile randomAccessFile = selectRandomAccessFile(key);
synchronized (randomAccessFile) {
randomAccessFile.seek(diskElement.position);
randomAccessFile.write(buffer.toByteArray(), 0, bufferLength);
@@ -825,8 +859,8 @@
DiskElement diskElement = findFreeBlock(bufferLength);
if (diskElement == null) {
diskElement = new DiskElement();
- synchronized (randomAccessFile) {
- diskElement.position = randomAccessFile.length();
+ synchronized (randomAccessFiles[0]) {
+ diskElement.position = randomAccessFiles[0].length();
}
diskElement.blockSize = bufferLength;
}
@@ -876,14 +910,14 @@
fin = new FileInputStream(indexFile);
objectInputStream = new ObjectInputStream(fin);
- Map diskElementsMap = (Map)objectInputStream.readObject();
+ Map<Object, DiskElement> diskElementsMap = (Map)objectInputStream.readObject();
if (diskElementsMap instanceof ConcurrentHashMap) {
diskElements = (ConcurrentHashMap)diskElementsMap;
} else {
diskElements = new ConcurrentHashMap(diskElementsMap);
}
- freeSpace = (List) objectInputStream.readObject();
+ freeSpace = (List<DiskElement>) objectInputStream.readObject();
success = true;
} catch (StreamCorruptedException e) {
LOG.log(Level.SEVERE, "Corrupt index file. Creating new index.");
@@ -948,8 +982,8 @@
final long now = System.currentTimeMillis();
// Clean up the spool
- for (Iterator iterator = spool.values().iterator(); iterator.hasNext();) {
- final Element element = (Element) iterator.next();
+ for (Iterator<Element> iterator = spool.values().iterator(); iterator.hasNext();) {
+ final Element element = iterator.next();
if (element.isExpired()) {
// An expired element
if (LOG.isLoggable(Level.FINE)) {
@@ -963,8 +997,8 @@
Element element = null;
RegisteredEventListeners listeners = cache.getCacheEventNotificationService();
// Clean up disk elements
- for (Iterator iterator = diskElements.entrySet().iterator(); iterator.hasNext();) {
- final Map.Entry entry = (Map.Entry) iterator.next();
+ for (Iterator<Map.Entry<Object, DiskElement>> iterator = diskElements.entrySet().iterator(); iterator.hasNext();) {
+ final Map.Entry<Object, DiskElement> entry = iterator.next();
final DiskElement diskElement = (DiskElement) entry.getValue();
if (now >= diskElement.expiryTime) {
@@ -1045,7 +1079,7 @@
* Copies of expiryTime and hitcount are held here as a performance optimisation, so
* that we do not need to load the data from Disk to get this often used information.
*/
- private static final class DiskElement implements Serializable {
+ public static final class DiskElement implements Serializable {
private static final long serialVersionUID = -717310932566592289L;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment