Last active
December 12, 2015 06:38
-
-
Save joe42/4730208 to your computer and use it in GitHub Desktop.
tested version
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py b/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py | |
index 98b2004..df6f0bc 100644 | |
--- a/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py | |
+++ b/cloudfusion/cloudfusion/pyfusebox/configurable_pyfusebox.py | |
@@ -13,6 +13,7 @@ from cloudfusion.store.dropbox.dropbox_store import DropboxStore | |
from cloudfusion.store.sugarsync.sugarsync_store import SugarsyncStore | |
from cloudfusion.store.caching_store import CachingStore | |
from cloudfusion.store.metadata_caching_store import MetadataCachingStore | |
+import random | |
class ConfigurablePyFuseBox(FlushingPyFuseBox): | |
@@ -133,22 +134,20 @@ class ConfigurablePyFuseBox(FlushingPyFuseBox): | |
conf = self.virtual_file.get_store_config_data() | |
service = conf['name'] | |
self.logger.debug("got service name") | |
- cache_time = 0 | |
- metadata_cache_time = 0 | |
- if 'cache' in conf: | |
- cache_time = int(conf['cache']) | |
- if 'metadata_cache' in conf: | |
- metadata_cache_time = int(conf['metadata_cache']) | |
+ cache_time = int(conf.get('cache', 0)) | |
+ metadata_cache_time = int(conf.get('metadata_cache', 0)) | |
+ cache_size = int(conf.get('cache_size', 2000)) | |
+ cache_id = str(conf.get('cache_id', random.random())) | |
self.logger.debug("got cache parameter") | |
auth = self.virtual_file.get_service_auth_data() | |
self.logger.debug("got auth data: "+str(auth)) | |
store = self.__get_new_store(service, auth) #catch error? | |
self.logger.debug("initialized store") | |
if cache_time > 0 and metadata_cache_time > 0: | |
- store = MetadataCachingStore( CachingStore( MetadataCachingStore( store, metadata_cache_time ), cache_time ), metadata_cache_time ) | |
+ store = MetadataCachingStore( CachingStore( MetadataCachingStore( store, metadata_cache_time ), cache_time, cache_size, cache_id ), metadata_cache_time ) | |
self.set_cache_expiration_time(cache_time) | |
elif cache_time > 0: | |
- store = CachingStore(store, cache_time) | |
+ store = CachingStore(store, cache_time, cache_size, cache_id) | |
self.set_cache_expiration_time(cache_time) | |
elif metadata_cache_time > 0: | |
store = MetadataCachingStore( store, metadata_cache_time ) | |
@@ -228,4 +227,4 @@ class ConfigurablePyFuseBox(FlushingPyFuseBox): | |
def chown(self, path, uid, gid): | |
raise FuseOSError(EROFS) | |
""" | |
- | |
\ No newline at end of file | |
+ | |
diff --git a/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py b/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py | |
index b17b579..79118ed 100644 | |
--- a/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py | |
+++ b/cloudfusion/cloudfusion/pyfusebox/flushing_pyfusebox.py | |
@@ -1,3 +1,5 @@ | |
+#Obsolete: LRU Cache is realized in distinguished module | |
+#TODO:Remove from inheritance path | |
from cloudfusion.pyfusebox.pyfusebox import * | |
import threading | |
import time | |
diff --git a/cloudfusion/cloudfusion/store/caching_store.py b/cloudfusion/cloudfusion/store/caching_store.py | |
index 9c14a02..b0e69e3 100644 | |
--- a/cloudfusion/cloudfusion/store/caching_store.py | |
+++ b/cloudfusion/cloudfusion/store/caching_store.py | |
@@ -10,25 +10,28 @@ Created on 08.04.2011 | |
''' | |
import tempfile | |
-from cloudfusion.util.cache import Cache | |
+from cloudfusion.util.persistent_lru_cache import PersistentLRUCache | |
from cloudfusion.store.dropbox.file_decorator import * | |
from cloudfusion.store.store import * | |
import time | |
import logging | |
+import random | |
"""Wrapped store needs a logger as an attribute called logger """ | |
class CachingStore(Store): | |
- def __init__(self, store, cache_expiration_time): | |
+ def __init__(self, store, cache_expiration_time, cache_size_in_mb=2000, cache_id=str(random.random())): | |
"""":param store: the store whose access should be cached | |
- :param cache_expiration_time: the time in seconds until any cache entry is expired""" | |
+ :param cache_expiration_time: the time in seconds until any cache entry is expired | |
+ :param:`cache_size_in_mb`: Approximate limit of the cache in MB. | |
+ :param:`cache_id`: Serves as identifier for a persistent cache instance. """ | |
self.store = store | |
self.logger = logging.getLogger(self.get_logging_handler()) | |
self.logger.debug("creating CachingStore object") | |
# self.temp_file = tempfile.SpooledTemporaryFile() | |
self.cache_expiration_time = cache_expiration_time | |
self.time_of_last_flush = time.time() | |
- self.entries = Cache(cache_expiration_time) | |
+ self.entries = PersistentLRUCache(cache_expiration_time, cache_size_in_mb, "/tmp/cachingstore_"+cache_id) | |
def get_cache_expiration_time(self): | |
""":returns: the time in seconds until any cache entry is expired""" | |
@@ -125,7 +128,7 @@ class CachingStore(Store): | |
self.logger.debug("cached storing value %s..." %self.entries.get_value(path)[:10]) | |
if flush: | |
self.logger.debug("cache entry for %s is expired -> flushing" % path) | |
- self._flush(path) | |
+ self.__flush(path) | |
def delete(self, path):#delete from metadata | |
self.entries.delete(path) | |
@@ -203,6 +206,7 @@ class CachingStore(Store): | |
self.time_of_last_flush = time.time() | |
for path in self.entries.get_keys(): | |
self.__flush(path) | |
+ self.logger.debug("flushed entries") | |
def __flush(self, path): | |
""" Writes the entry with the key :param:`path` to the wrapped store, only if it is dirty.""" | |
@@ -212,10 +216,12 @@ class CachingStore(Store): | |
if self.entries.is_dirty(path): | |
file = DataFileWrapper(self.entries.get_value(path)) | |
file.fileno()# | |
+ self.logger.debug("flushing %s with content starting with %s" % (path, self.entries.get_value(path)[0:10])) | |
self.store.store_fileobject(file, path) | |
+ self.logger.debug("flushed %s 1" % path) | |
self.entries.flush(path) | |
- self.logger.debug("flushing %s with content starting with %s" % (path, self.entries.get_value(path)[0:10])) | |
+ self.logger.debug("flushed %s with content starting with %s" % (path, self.entries.get_value(path)[0:10])) | |
def get_logging_handler(self): | |
- return self.store.get_logging_handler() | |
\ No newline at end of file | |
+ return self.store.get_logging_handler() | |
diff --git a/cloudfusion/cloudfusion/util/cache.py b/cloudfusion/cloudfusion/util/cache.py | |
index 8c1dcd8..50af129 100644 | |
--- a/cloudfusion/cloudfusion/util/cache.py | |
+++ b/cloudfusion/cloudfusion/util/cache.py | |
@@ -7,6 +7,7 @@ import time | |
class Cache(object): | |
def __init__(self, expiration_time): | |
+ """Return a Cache instance with entries expiring after :param:`expiration_time` seconds.""" | |
self.entries = {} | |
self.expire = expiration_time | |
@@ -32,34 +33,31 @@ class Cache(object): | |
self.entries[key] = entry | |
def get_keys(self): | |
- return self.entries | |
+ return self.entries.keys() | |
def get_modified(self, key): | |
return self.entries[key]['modified'] | |
+ def _get_size_of_entry(self, entry): | |
+ try: | |
+ return entry['value'].get_size() | |
+ except: | |
+ try: | |
+ return entry['value'].size | |
+ except: | |
+ return len(str(entry['value'])) | |
+ | |
def get_size_of_dirty_data(self): | |
ret = 0 | |
for entry in self.entries: | |
if self.is_dirty(entry): | |
- try: | |
- ret+= self.get_value(entry).get_size() | |
- except: | |
- try: | |
- ret+= self.get_value(entry).size | |
- except: | |
- ret += len(str(self.get_value(entry))) | |
+ ret+= self._get_size_of_entry(entry) | |
return ret | |
def get_size_of_cached_data(self): | |
ret = 0 | |
for entry in self.entries: | |
- try: | |
- ret+= self.get_value(entry).get_size() | |
- except: | |
- try: | |
- ret+= self.get_value(entry).size | |
- except: | |
- ret += len(str(self.get_value(entry))) | |
+ ret+= self._get_size_of_entry(entry) | |
return ret | |
def exists(self, key): | |
@@ -81,10 +79,16 @@ class Cache(object): | |
return self.entries[key]['value'] | |
def is_dirty(self, key): | |
return self.entries[key]['dirty'] | |
def set_dirty(self, key, is_dirty): | |
+ if is_dirty: | |
+ else: | |
self.entries[key]['dirty'] = is_dirty | |
def delete(self, key): | |
try: | |
diff --git a/cloudfusion/cloudfusion/util/lru_cache.py b/cloudfusion/cloudfusion/util/lru_cache.py | |
new file mode 100644 | |
index 0000000..8bcc334 | |
--- /dev/null | |
+++ b/cloudfusion/cloudfusion/util/lru_cache.py | |
@@ -0,0 +1,138 @@ | |
+''' | |
+Created on Feb 5, 2013 | |
+ | |
+@author: joe | |
+''' | |
+from cloudfusion.util.cache import Cache | |
+import time | |
+ | |
+LISTHEAD = '########################## ######## list_head ###### #############################' | |
+LISTTAIL = '########################## ######## list_tail ###### #############################' | |
+CACHESIZE = '############################## #### cache_size ###### #############################' | |
+NEXT, PREV = (0,1) | |
+ | |
+class LRUCache(Cache): | |
+ ''' | |
+ classdocs | |
+ ''' | |
+ | |
+ | |
+ def __init__(self, expiration_time, maxsize_in_MB): | |
+ """Return an LRUCache instance. | |
+ :param:`expiration_time`: Time in seconds until entries are expired. | |
+ :param:`maxsize_in_MB`: Approximate limit of the cache in MB. | |
+ """ | |
+ super( LRUCache, self ).__init__(expiration_time) | |
+ #self.entries = shelve.open(filename) | |
+ #self.list_head = self.entries[LISTHEAD] if LISTHEAD in self.entries else None | |
+ #self.entries[LISTTAIL] = self.entries[LISTTAIL] if LISTTAIL in self.entries else None | |
+ self.maxsize = maxsize_in_MB | |
+ self.entries = {} | |
+ self.entries[LISTHEAD] = None | |
+ self.entries[LISTTAIL] = None | |
+ self.entries[CACHESIZE] = 0 | |
+ | |
+ | |
+ def get_keys(self): | |
+ keys = super( LRUCache, self ).get_keys() | |
+ keys.remove(LISTHEAD) | |
+ keys.remove(LISTTAIL) | |
+ keys.remove(CACHESIZE) | |
+ return keys | |
+ | |
+ def _move_used_entry_to_head(self, key): | |
+ """"Put existing entry associated with :param:`key` in front of the LRU queue.""" | |
+ used_entry = self.entries[key] | |
+ if not used_entry[NEXT]: #entry is list head | |
+ return | |
+ self._unlink(key) | |
+ tmp = self.entries[LISTHEAD] | |
+ self.entries[LISTHEAD] = used_entry | |
+ used_entry[NEXT] = None | |
+ used_entry[PREV] = tmp | |
+ tmp[NEXT] = used_entry | |
+ | |
+ def refresh(self, key, disk_value, modified): | |
+ """ Refreshes an entry with :param:`disk_value`, if :param:`modified` is bigger than the entry's modified date. """ | |
+ if key in self.entries: | |
+ disk_entry_is_newer = modified > self.entries[key]['modified'] | |
+ if not disk_entry_is_newer: | |
+ return | |
+ else: | |
+ self.delete(key) | |
+ entry = {} | |
+ entry['value'] = disk_value | |
+ entry['updated'] = time.time() | |
+ entry['modified'] = modified | |
+ entry['dirty'] = False | |
+ tmp = self.entries[LISTHEAD] | |
+ self.entries[LISTHEAD] = entry | |
+ entry[NEXT] = None | |
+ entry[PREV] = tmp | |
+ if tmp: #tmp currently is list_head | |
+ tmp[NEXT] = entry | |
+ else: #if list_head is empty, this is the first element in the list -> set tail to first element | |
+ self.entries[LISTTAIL] = entry | |
+ self.entries[key] = entry | |
+ self.entries[CACHESIZE] += self._get_size_of_entry(entry) | |
+ self._resize() | |
+ | |
+ def write(self, key, value): | |
+ self._unlink(key) | |
+ entry = {} | |
+ entry['value'] = value | |
+ entry['updated'] = time.time() | |
+ entry['modified'] = time.time() | |
+ entry['dirty'] = True | |
+ tmp = self.entries[LISTHEAD] | |
+ self.entries[LISTHEAD] = entry | |
+ entry[NEXT] = None | |
+ entry[PREV] = tmp | |
+ if tmp: #tmp currently is list_head | |
+ tmp[NEXT] = entry | |
+ else: #if list_head is empty, this is the first element in the list -> set tail to first element | |
+ self.entries[LISTTAIL] = entry | |
+ self.entries[key] = entry | |
+ self.entries[CACHESIZE] += self._get_size_of_entry(entry) | |
+ self._resize() | |
+ | |
+ def get_size_of_dirty_data(self): | |
+ ret = 0 | |
+ for entry in self.entries: | |
+ if self.is_dirty(entry): | |
+ ret += get_size_of_entry(entry) | |
+ return ret | |
+ | |
+ def get_size_of_cached_data(self): | |
+ return self.entries[CACHESIZE] | |
+ | |
+ def get_value(self, key): | |
+ self._move_used_entry_to_head(key) | |
+ return self.entries[key]['value'] | |
+ | |
+ | |
+ def _resize(self): | |
+ """Resize cache to maxsize.""" | |
+ while self.entries[CACHESIZE]/1000 > self.maxsize: | |
+ self.entries[CACHESIZE] -= self._get_size_of_entry(self.entries[LISTTAIL]) | |
+ self.entries[LISTTAIL][NEXT][PREV] = None | |
+ | |
+ def delete(self, key): | |
+ """Remove current entry associated with key from the LRU queue.""" | |
+ if key in self.entries: | |
+ entry = self.entries[key] | |
+ self.entries[CACHESIZE] -= self._get_size_of_entry(entry) | |
+ self._unlink(key) | |
+ del self.entries[key] | |
+ | |
+ def _unlink(self, key): | |
+ if key in self.entries: | |
+ entry = self.entries[key] | |
+ if not entry[PREV]: #entry is list tail | |
+ self.entries[LISTTAIL] = entry[NEXT] | |
+ else: | |
+ entry[PREV][NEXT] = entry[NEXT] | |
+ if not entry[NEXT]: #entry is list head | |
+ self.entries[LISTHEAD] = entry[PREV] | |
+ else: | |
+ entry[NEXT][PREV] = entry[PREV] | |
diff --git a/cloudfusion/cloudfusion/util/persistent_lru_cache.py b/cloudfusion/cloudfusion/util/persistent_lru_cache.py | |
new file mode 100644 | |
index 0000000..8579309 | |
--- /dev/null | |
+++ b/cloudfusion/cloudfusion/util/persistent_lru_cache.py | |
@@ -0,0 +1,124 @@ | |
+''' | |
+Created on Feb 5, 2013 | |
+ | |
+@author: joe | |
+''' | |
+from cloudfusion.util.lru_cache import * | |
+import shelve | |
+import hashlib | |
+import os | |
+import atexit | |
+ | |
+LASTFILEID = "############################## #### file_id ###### #############################'" | |
+ | |
+class PersistentLRUCache(LRUCache): | |
+ ''' | |
+ classdocs | |
+ ''' | |
+ | |
+ | |
+ def __init__(self, expiration_time, maxsize_in_MB, directory): | |
+ """Return an LRUCache instance. | |
+ :param:`expiration_time`: Time in seconds until entries are expired. | |
+ :param:`maxsize_in_MB`: Approximate limit of the cache in MB. | |
+ :param:`directory`: Directory to store persistent data, also serves as identifier for a persistent cache instance. | |
+ """ | |
+ super( PersistentLRUCache, self ).__init__(expiration_time, maxsize_in_MB) | |
+ self.filename = "Database" | |
+ self.directory = directory | |
+ try: | |
+ os.makedirs(directory) | |
+ except: | |
+ pass | |
+ self.entries = shelve.open(directory+"/"+self.filename, writeback=True) | |
+ atexit.register( lambda : self.entries.close() ) | |
+ if not LISTHEAD in self.entries: #first time use | |
+ self.entries[LISTHEAD] = None | |
+ self.entries[LISTTAIL] = None | |
+ self.entries[CACHESIZE] = 0 | |
+ self.entries[LASTFILEID] = 0 | |
+ | |
+ def get_keys(self): | |
+ keys = super( PersistentLRUCache, self ).get_keys() | |
+ keys.remove(LASTFILEID) | |
+ return keys | |
+ | |
+ def refresh(self, key, disk_value, modified): | |
+ """ Refreshes an entry with :param:`disk_value`, if :param:`modified` is bigger than the entry's modified date. """ | |
+ if key in self.entries: | |
+ disk_entry_is_newer = modified > self.entries[key]['modified'] | |
+ if not disk_entry_is_newer: | |
+ return | |
+ filename = self.entries[key]['value'] | |
+ else: | |
+ self.entries[LASTFILEID] += 1 | |
+ filename = self.directory+"/"+str(self.entries[LASTFILEID]) | |
+ self._write_to_file(filename, key, disk_value) | |
+ super( PersistentLRUCache, self ).refresh(key, filename, modified) | |
+ | |
+ def _write_to_file(self, filename, key, value): | |
+ if key in self.entries: # check if file exists | |
+ try: | |
+ self.entries[CACHESIZE] -= self._get_persistent_size(filename) | |
+ except: | |
+ fh = open(filename,"w") | |
+ fh.write(value) | |
+ fh.close() | |
+ self.entries[CACHESIZE] += self._get_persistent_size(filename) | |
+ | |
+ def _get_persistent_size(self, filepath): | |
+ return os.path.getsize(filepath) | |
+ | |
+ def _get_file_content(self, filepath): | |
+ fh = open(filepath) | |
+ content = fh.read() | |
+ fh.close() | |
+ return content | |
+ | |
+ def write(self, key, value): | |
+ if key in self.entries: | |
+ filename = self.entries[key]['value'] | |
+ else: | |
+ self.entries[LASTFILEID] += 1 | |
+ filename = self.directory+"/"+str(self.entries[LASTFILEID]) | |
+ self._write_to_file(filename, key, value) | |
+ super( PersistentLRUCache, self ).write(key, filename) | |
+ | |
+ def get_size_of_dirty_data(self): | |
+ ret = 0 | |
+ for entry in self.entries: | |
+ if self.is_dirty(entry): | |
+ ret += self._get_persistent_size(entry['value']) | |
+ return ret | |
+ | |
+ def get_value(self, key): | |
+ return self._get_file_content(super( PersistentLRUCache, self ).get_value(key)) | |
+ | |
+ | |
+ def _resize(self): | |
+ """Resize cache to maxsize.""" | |
+ while self.entries[CACHESIZE]/1000000 > self.maxsize and self.entries[LISTTAIL][NEXT]: | |
+ self.entries[CACHESIZE] -= self._get_size_of_entry(self.entries[LISTTAIL]) | |
+ self.entries[CACHESIZE] -= self._get_persistent_size(self.entries[LISTTAIL]['value']) | |
+ os.remove(self.entries[LISTTAIL]['value']) | |
+ self.entries[LISTTAIL][NEXT][PREV] = None | |
+ self.entries[LISTTAIL] = self.entries[LISTTAIL][NEXT] | |
+ | |
+ def delete(self, key): | |
+ """Remove current entry associated with key from the LRU queue and delete its persistent representation.""" | |
+ if key in self.entries: | |
+ entry = self.entries[key] | |
+ self.entries[CACHESIZE] -= self._get_persistent_size(entry['value']) | |
+ #os.remove(entry['value']) | |
+ super( PersistentLRUCache, self ).delete(key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment