Skip to content

Instantly share code, notes, and snippets.

@joe42
Last active December 12, 2015 06:38
Show Gist options
  • Save joe42/4730208 to your computer and use it in GitHub Desktop.
Save joe42/4730208 to your computer and use it in GitHub Desktop.
tested version
diff --git a/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py b/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py
index 98b2004..df6f0bc 100644
--- a/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py
+++ b/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py
@@ -13,6 +13,7 @@ from cloudfusion.store.dropbox.dropbox_store import DropboxStore
from cloudfusion.store.sugarsync.sugarsync_store import SugarsyncStore
from cloudfusion.store.caching_store import CachingStore
from cloudfusion.store.metadata_caching_store import MetadataCachingStore
+import random
class ConfigurablePyFuseBox(FlushingPyFuseBox):
@@ -133,22 +134,20 @@ class ConfigurablePyFuseBox(FlushingPyFuseBox):
conf = self.virtual_file.get_store_config_data()
service = conf['name']
self.logger.debug("got service name")
- cache_time = 0
- metadata_cache_time = 0
- if 'cache' in conf:
- cache_time = int(conf['cache'])
- if 'metadata_cache' in conf:
- metadata_cache_time = int(conf['metadata_cache'])
+ cache_time = int(conf.get('cache', 0))
+ metadata_cache_time = int(conf.get('metadata_cache', 0))
+ cache_size = int(conf.get('cache_size', 2000))
+ cache_id = str(conf.get('cache_id', random.random()))
self.logger.debug("got cache parameter")
auth = self.virtual_file.get_service_auth_data()
self.logger.debug("got auth data: "+str(auth))
store = self.__get_new_store(service, auth) #catch error?
self.logger.debug("initialized store")
if cache_time > 0 and metadata_cache_time > 0:
- store = MetadataCachingStore( CachingStore( MetadataCachingStore( store, metadata_cache_time ), cache_time ), metadata_cache_time )
+ store = MetadataCachingStore( CachingStore( MetadataCachingStore( store, metadata_cache_time ), cache_time, cache_size, cache_id ), metadata_cache_time )
self.set_cache_expiration_time(cache_time)
elif cache_time > 0:
- store = CachingStore(store, cache_time)
+ store = CachingStore(store, cache_time, cache_size, cache_id)
self.set_cache_expiration_time(cache_time)
elif metadata_cache_time > 0:
store = MetadataCachingStore( store, metadata_cache_time )
@@ -228,4 +227,4 @@ class ConfigurablePyFuseBox(FlushingPyFuseBox):
def chown(self, path, uid, gid):
raise FuseOSError(EROFS)
"""
-
\ No newline at end of file
+
diff --git a/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py b/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py
index b17b579..79118ed 100644
--- a/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py
+++ b/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py
@@ -1,3 +1,5 @@
+#Obsolete: LRU Cache is realized in distinguished module
+#TODO:Remove from inheritance path
from cloudfusion.pyfusebox.pyfusebox import *
import threading
import time
diff --git a/cloudfusion/cloudfusion/store/caching_store.py b/cloudfusion/cloudfusion/store/caching_store.py
index 9c14a02..b0e69e3 100644
--- a/cloudfusion/cloudfusion/store/caching_store.py
+++ b/cloudfusion/cloudfusion/store/caching_store.py
@@ -10,25 +10,28 @@ Created on 08.04.2011
'''
import tempfile
-from cloudfusion.util.cache import Cache
+from cloudfusion.util.persistent_lru_cache import PersistentLRUCache
from cloudfusion.store.dropbox.file_decorator import *
from cloudfusion.store.store import *
import time
import logging
+import random
"""Wrapped store needs a logger as an attribute called logger """
class CachingStore(Store):
- def __init__(self, store, cache_expiration_time):
+ def __init__(self, store, cache_expiration_time, cache_size_in_mb=2000, cache_id=str(random.random())):
"""":param store: the store whose access should be cached
- :param cache_expiration_time: the time in seconds until any cache entry is expired"""
+ :param cache_expiration_time: the time in seconds until any cache entry is expired
+ :param:`cache_size_in_mb`: Approximate limit of the cache in MB.
+ :param:`cache_id`: Serves as identifier for a persistent cache instance. """
self.store = store
self.logger = logging.getLogger(self.get_logging_handler())
self.logger.debug("creating CachingStore object")
# self.temp_file = tempfile.SpooledTemporaryFile()
self.cache_expiration_time = cache_expiration_time
self.time_of_last_flush = time.time()
- self.entries = Cache(cache_expiration_time)
+ self.entries = PersistentLRUCache(cache_expiration_time, cache_size_in_mb, "/tmp/cachingstore_"+cache_id)
def get_cache_expiration_time(self):
""":returns: the time in seconds until any cache entry is expired"""
@@ -125,7 +128,7 @@ class CachingStore(Store):
self.logger.debug("cached storing value %s..." %self.entries.get_value(path)[:10])
if flush:
self.logger.debug("cache entry for %s is expired -> flushing" % path)
- self._flush(path)
+ self.__flush(path)
def delete(self, path):#delete from metadata
self.entries.delete(path)
@@ -203,6 +206,7 @@ class CachingStore(Store):
self.time_of_last_flush = time.time()
for path in self.entries.get_keys():
self.__flush(path)
+ self.logger.debug("flushed entries")
def __flush(self, path):
""" Writes the entry with the key :param:`path` to the wrapped store, only if it is dirty."""
@@ -212,10 +216,12 @@ class CachingStore(Store):
if self.entries.is_dirty(path):
file = DataFileWrapper(self.entries.get_value(path))
file.fileno()#
+ self.logger.debug("flushing %s with content starting with %s" % (path, self.entries.get_value(path)[0:10]))
self.store.store_fileobject(file, path)
+ self.logger.debug("flushed %s 1" % path)
self.entries.flush(path)
- self.logger.debug("flushing %s with content starting with %s" % (path, self.entries.get_value(path)[0:10]))
+ self.logger.debug("flushed %s with content starting with %s" % (path, self.entries.get_value(path)[0:10]))
def get_logging_handler(self):
- return self.store.get_logging_handler()
\ No newline at end of file
+ return self.store.get_logging_handler()
diff --git a/cloudfusion/cloudfusion/util/cache.py b/cloudfusion/cloudfusion/util/cache.py
index 8c1dcd8..50af129 100644
--- a/cloudfusion/cloudfusion/util/cache.py
+++ b/cloudfusion/cloudfusion/util/cache.py
@@ -7,6 +7,7 @@ import time
class Cache(object):
def __init__(self, expiration_time):
+ """Return a Cache instance with entries expiring after :param:`expiration_time` seconds."""
self.entries = {}
self.expire = expiration_time
@@ -32,34 +33,31 @@ class Cache(object):
self.entries[key] = entry
def get_keys(self):
- return self.entries
+ return self.entries.keys()
def get_modified(self, key):
return self.entries[key]['modified']
+ def _get_size_of_entry(self, entry):
+ try:
+ return entry['value'].get_size()
+ except:
+ try:
+ return entry['value'].size
+ except:
+ return len(str(entry['value']))
+
def get_size_of_dirty_data(self):
ret = 0
for entry in self.entries:
if self.is_dirty(entry):
- try:
- ret+= self.get_value(entry).get_size()
- except:
- try:
- ret+= self.get_value(entry).size
- except:
- ret += len(str(self.get_value(entry)))
+ ret+= self._get_size_of_entry(entry)
return ret
def get_size_of_cached_data(self):
ret = 0
for entry in self.entries:
- try:
- ret+= self.get_value(entry).get_size()
- except:
- try:
- ret+= self.get_value(entry).size
- except:
- ret += len(str(self.get_value(entry)))
+ ret+= self._get_size_of_entry(entry)
return ret
def exists(self, key):
@@ -81,10 +79,16 @@ class Cache(object):
return self.entries[key]['value']
def is_dirty(self, key):
return self.entries[key]['dirty']
def set_dirty(self, key, is_dirty):
+ if is_dirty:
+ else:
self.entries[key]['dirty'] = is_dirty
def delete(self, key):
try:
diff --git a/cloudfusion/cloudfusion/util/lru_cache.py b/cloudfusion/cloudfusion/util/lru_cache.py
new file mode 100644
index 0000000..8bcc334
--- /dev/null
+++ b/cloudfusion/cloudfusion/util/lru_cache.py
@@ -0,0 +1,138 @@
+'''
+Created on Feb 5, 2013
+
+@author: joe
+'''
+from cloudfusion.util.cache import Cache
+import time
+
+LISTHEAD = '########################## ######## list_head ###### #############################'
+LISTTAIL = '########################## ######## list_tail ###### #############################'
+CACHESIZE = '############################## #### cache_size ###### #############################'
+NEXT, PREV = (0,1)
+
+class LRUCache(Cache):
+ '''
+ classdocs
+ '''
+
+
+ def __init__(self, expiration_time, maxsize_in_MB):
+ """Return an LRUCache instance.
+ :param:`expiration_time`: Time in seconds until entries are expired.
+ :param:`maxsize_in_MB`: Approximate limit of the cache in MB.
+ """
+ super( LRUCache, self ).__init__(expiration_time)
+ #self.entries = shelve.open(filename)
+ #self.list_head = self.entries[LISTHEAD] if LISTHEAD in self.entries else None
+ #self.entries[LISTTAIL] = self.entries[LISTTAIL] if LISTTAIL in self.entries else None
+ self.maxsize = maxsize_in_MB
+ self.entries = {}
+ self.entries[LISTHEAD] = None
+ self.entries[LISTTAIL] = None
+ self.entries[CACHESIZE] = 0
+
+
+ def get_keys(self):
+ keys = super( LRUCache, self ).get_keys()
+ keys.remove(LISTHEAD)
+ keys.remove(LISTTAIL)
+ keys.remove(CACHESIZE)
+ return keys
+
+ def _move_used_entry_to_head(self, key):
+ """"Put existing entry associated with :param:`key` in front of the LRU queue."""
+ used_entry = self.entries[key]
+ if not used_entry[NEXT]: #entry is list head
+ return
+ self._unlink(key)
+ tmp = self.entries[LISTHEAD]
+ self.entries[LISTHEAD] = used_entry
+ used_entry[NEXT] = None
+ used_entry[PREV] = tmp
+ tmp[NEXT] = used_entry
+
+ def refresh(self, key, disk_value, modified):
+ """ Refreshes an entry with :param:`disk_value`, if :param:`modified` is bigger than the entry's modified date. """
+ if key in self.entries:
+ disk_entry_is_newer = modified > self.entries[key]['modified']
+ if not disk_entry_is_newer:
+ return
+ else:
+ self.delete(key)
+ entry = {}
+ entry['value'] = disk_value
+ entry['updated'] = time.time()
+ entry['modified'] = modified
+ entry['dirty'] = False
+ tmp = self.entries[LISTHEAD]
+ self.entries[LISTHEAD] = entry
+ entry[NEXT] = None
+ entry[PREV] = tmp
+ if tmp: #tmp currently is list_head
+ tmp[NEXT] = entry
+ else: #if list_head is empty, this is the first element in the list -> set tail to first element
+ self.entries[LISTTAIL] = entry
+ self.entries[key] = entry
+ self.entries[CACHESIZE] += self._get_size_of_entry(entry)
+ self._resize()
+
+ def write(self, key, value):
+ self._unlink(key)
+ entry = {}
+ entry['value'] = value
+ entry['updated'] = time.time()
+ entry['modified'] = time.time()
+ entry['dirty'] = True
+ tmp = self.entries[LISTHEAD]
+ self.entries[LISTHEAD] = entry
+ entry[NEXT] = None
+ entry[PREV] = tmp
+ if tmp: #tmp currently is list_head
+ tmp[NEXT] = entry
+ else: #if list_head is empty, this is the first element in the list -> set tail to first element
+ self.entries[LISTTAIL] = entry
+ self.entries[key] = entry
+ self.entries[CACHESIZE] += self._get_size_of_entry(entry)
+ self._resize()
+
+ def get_size_of_dirty_data(self):
+ ret = 0
+ for entry in self.entries:
+ if self.is_dirty(entry):
+ ret += get_size_of_entry(entry)
+ return ret
+
+ def get_size_of_cached_data(self):
+ return self.entries[CACHESIZE]
+
+ def get_value(self, key):
+ self._move_used_entry_to_head(key)
+ return self.entries[key]['value']
+
+
+ def _resize(self):
+ """Resize cache to maxsize."""
+ while self.entries[CACHESIZE]/1000 > self.maxsize:
+ self.entries[CACHESIZE] -= self._get_size_of_entry(self.entries[LISTTAIL])
+ self.entries[LISTTAIL][NEXT][PREV] = None
+
+ def delete(self, key):
+ """Remove current entry associated with key from the LRU queue."""
+ if key in self.entries:
+ entry = self.entries[key]
+ self.entries[CACHESIZE] -= self._get_size_of_entry(entry)
+ self._unlink(key)
+ del self.entries[key]
+
+ def _unlink(self, key):
+ if key in self.entries:
+ entry = self.entries[key]
+ if not entry[PREV]: #entry is list tail
+ self.entries[LISTTAIL] = entry[NEXT]
+ else:
+ entry[PREV][NEXT] = entry[NEXT]
+ if not entry[NEXT]: #entry is list head
+ self.entries[LISTHEAD] = entry[PREV]
+ else:
+ entry[NEXT][PREV] = entry[PREV]
diff --git a/cloudfusion/cloudfusion/util/persistent_lru_cache.py b/cloudfusion/cloudfusion/util/persistent_lru_cache.py
new file mode 100644
index 0000000..8579309
--- /dev/null
+++ b/cloudfusion/cloudfusion/util/persistent_lru_cache.py
@@ -0,0 +1,124 @@
+'''
+Created on Feb 5, 2013
+
+@author: joe
+'''
+from cloudfusion.util.lru_cache import *
+import shelve
+import hashlib
+import os
+import atexit
+
+LASTFILEID = "############################## #### file_id ###### #############################'"
+
+class PersistentLRUCache(LRUCache):
+ '''
+ classdocs
+ '''
+
+
+ def __init__(self, expiration_time, maxsize_in_MB, directory):
+ """Return an LRUCache instance.
+ :param:`expiration_time`: Time in seconds until entries are expired.
+ :param:`maxsize_in_MB`: Approximate limit of the cache in MB.
+ :param:`directory`: Directory to store persistent data, also serves as identifier for a persistent cache instance.
+ """
+ super( PersistentLRUCache, self ).__init__(expiration_time, maxsize_in_MB)
+ self.filename = "Database"
+ self.directory = directory
+ try:
+ os.makedirs(directory)
+ except:
+ pass
+ self.entries = shelve.open(directory+"/"+self.filename, writeback=True)
+ atexit.register( lambda : self.entries.close() )
+ if not LISTHEAD in self.entries: #first time use
+ self.entries[LISTHEAD] = None
+ self.entries[LISTTAIL] = None
+ self.entries[CACHESIZE] = 0
+ self.entries[LASTFILEID] = 0
+
+ def get_keys(self):
+ keys = super( PersistentLRUCache, self ).get_keys()
+ keys.remove(LASTFILEID)
+ return keys
+
+ def refresh(self, key, disk_value, modified):
+ """ Refreshes an entry with :param:`disk_value`, if :param:`modified` is bigger than the entry's modified date. """
+ if key in self.entries:
+ disk_entry_is_newer = modified > self.entries[key]['modified']
+ if not disk_entry_is_newer:
+ return
+ filename = self.entries[key]['value']
+ else:
+ self.entries[LASTFILEID] += 1
+ filename = self.directory+"/"+str(self.entries[LASTFILEID])
+ self._write_to_file(filename, key, disk_value)
+ super( PersistentLRUCache, self ).refresh(key, filename, modified)
+
+ def _write_to_file(self, filename, key, value):
+ if key in self.entries: # check if file exists
+ try:
+ self.entries[CACHESIZE] -= self._get_persistent_size(filename)
+ except:
+ fh = open(filename,"w")
+ fh.write(value)
+ fh.close()
+ self.entries[CACHESIZE] += self._get_persistent_size(filename)
+
+ def _get_persistent_size(self, filepath):
+ return os.path.getsize(filepath)
+
+ def _get_file_content(self, filepath):
+ fh = open(filepath)
+ content = fh.read()
+ fh.close()
+ return content
+
+ def write(self, key, value):
+ if key in self.entries:
+ filename = self.entries[key]['value']
+ else:
+ self.entries[LASTFILEID] += 1
+ filename = self.directory+"/"+str(self.entries[LASTFILEID])
+ self._write_to_file(filename, key, value)
+ super( PersistentLRUCache, self ).write(key, filename)
+
+ def get_size_of_dirty_data(self):
+ ret = 0
+ for entry in self.entries:
+ if self.is_dirty(entry):
+ ret += self._get_persistent_size(entry['value'])
+ return ret
+
+ def get_value(self, key):
+ return self._get_file_content(super( PersistentLRUCache, self ).get_value(key))
+
+
+ def _resize(self):
+ """Resize cache to maxsize."""
+ while self.entries[CACHESIZE]/1000000 > self.maxsize and self.entries[LISTTAIL][NEXT]:
+ self.entries[CACHESIZE] -= self._get_size_of_entry(self.entries[LISTTAIL])
+ self.entries[CACHESIZE] -= self._get_persistent_size(self.entries[LISTTAIL]['value'])
+ os.remove(self.entries[LISTTAIL]['value'])
+ self.entries[LISTTAIL][NEXT][PREV] = None
+ self.entries[LISTTAIL] = self.entries[LISTTAIL][NEXT]
+
+ def delete(self, key):
+ """Remove current entry associated with key from the LRU queue and delete its persistent representation."""
+ if key in self.entries:
+ entry = self.entries[key]
+ self.entries[CACHESIZE] -= self._get_persistent_size(entry['value'])
+ #os.remove(entry['value'])
+ super( PersistentLRUCache, self ).delete(key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment