Created
November 13, 2011 02:08
-
-
Save anaisbetts/1361481 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.IO; | |
| using System.Reactive.Concurrency; | |
| using System.Reactive.Linq; | |
| using System.Reactive.Subjects; | |
| using System.Reflection; | |
| using ReactiveUI; | |
| namespace GitHub.Core | |
| { | |
| /// <summary> | |
| /// This class represents an asynchronous key-value store backed by a | |
| /// directory. It stores the last 'n' key requests in memory. | |
| /// </summary> | |
| public class PersistentBlobCache : IEnableLogger, IDisposable | |
| { | |
| MemoizingMRUCache<string, AsyncSubject<byte[]>> _memoizedRequests; | |
| readonly IScheduler _scheduler; | |
| readonly string _cacheDirectory; | |
| public PersistentBlobCache(string cacheDirectory = null, IScheduler scheduler = null) | |
| { | |
| _cacheDirectory = cacheDirectory ?? GetDefaultCacheDirectory(); | |
| _scheduler = scheduler ?? RxApp.TaskpoolScheduler; | |
| // Here, we're not actually caching the requests directly (i.e. as | |
| // byte[]s), but as the "replayed result of the request", in the | |
| // AsyncSubject - this makes the code infinitely simpler because | |
| // we don't have to keep a separate list of "in-flight reads" vs | |
| // "already completed and cached reads" | |
| _memoizedRequests = new MemoizingMRUCache<string, AsyncSubject<byte[]>>( | |
| FetchOrWriteBlobFromDisk, 20); | |
| if (!Directory.Exists(_cacheDirectory)) | |
| { | |
| (new DirectoryInfo(_cacheDirectory)).CreateRecursive(); | |
| } | |
| } | |
| public void Insert(string key, byte[] data) | |
| { | |
| // NB: Since FetchOrWriteBlobFromDisk is guaranteed to not block, | |
| // we never sit on this lock for any real length of time | |
| lock(this) | |
| { | |
| var err = _memoizedRequests.Get(key, data); | |
| // If we fail trying to fetch/write the key on disk, we want to | |
| // try again instead of replaying the same failure | |
| err.Subscribe(x => {}, | |
| ex => _memoizedRequests.Invalidate(key)); | |
| } | |
| } | |
| public IObservable<byte[]> GetAsync(string key) | |
| { | |
| lock(this) | |
| { | |
| // There are three scenarios here, and we handle all of them | |
| // with aplomb and elegance: | |
| // | |
| // 1. The key is already in memory as a completed request - we | |
| // return the AsyncSubject which will replay the result | |
| // | |
| // 2. The key is currently being fetched from disk - in this | |
| // case, MemoizingMRUCache has an AsyncSubject for it (since | |
| // FetchOrWriteBlobFromDisk completes immediately), and the | |
| // client will get the result when the disk I/O completes | |
| // | |
| // 3. The key isn't in memory and isn't being fetched - in | |
| // this case, FetchOrWriteBlobFromDisk will be called which | |
| // will immediately return an AsyncSubject representing the | |
| // queued disk read. | |
| var ret = _memoizedRequests.Get(key); | |
| // If we fail trying to fetch/write the key on disk, we want to | |
| // try again instead of replaying the same failure | |
| ret.Subscribe(x => {}, | |
| ex => _memoizedRequests.Invalidate(key)); | |
| return ret; | |
| } | |
| } | |
| public void Dispose() | |
| { | |
| // We need to make sure that all outstanding writes are flushed | |
| // before we bail | |
| lock(this) | |
| { | |
| // Since these are all AsyncSubjects, most of them will replay | |
| // immediately, except for the ones still outstanding; we'll | |
| // Merge them all then wait for them all to complete. | |
| _memoizedRequests.CachedValues() | |
| .Merge() | |
| .Timeout(TimeSpan.FromSeconds(30), _scheduler) | |
| .Wait(); | |
| _memoizedRequests = null; | |
| } | |
| } | |
| AsyncSubject<byte[]> FetchOrWriteBlobFromDisk(string key, object byteData) | |
| { | |
| var ret = new AsyncSubject<byte[]>(); | |
| // If this is secretly a write, dispatch to WriteBlobToDisk (we're | |
| // kind of abusing the 'context' variable from MemoizingMRUCache | |
| // here a bit) | |
| if (byteData != null) | |
| { | |
| return WriteBlobToDisk(key, (byte[]) byteData); | |
| } | |
| FileStream fd = null; | |
| var ms = new MemoryStream(); | |
| var workItem = Observable.Start(() => | |
| { | |
| fd = File.OpenRead(GetPathForKey(key)); | |
| fd.CopyTo(ms); | |
| return ms.ToArray(); | |
| }, _scheduler); | |
| workItem | |
| .Catch<byte[], FileNotFoundException>(ex => Observable.Throw<byte[]>(new KeyNotFoundException())) | |
| .Finally(() => { if (fd != null) fd.Dispose(); }) | |
| .Multicast(ret).Connect(); | |
| return ret; | |
| } | |
| AsyncSubject<byte[]> WriteBlobToDisk(string key, byte[] byteData) | |
| { | |
| var ret = new AsyncSubject<byte[]>(); | |
| var ms = new MemoryStream(byteData); | |
| FileStream fd = null; | |
| var workItem = Observable.Start(() => | |
| { | |
| fd = File.OpenWrite(GetPathForKey(key)); | |
| ms.CopyTo(fd); | |
| return byteData; | |
| }, _scheduler); | |
| // NB: The fact that our writing AsyncSubject waits until the | |
| // write actually completes means that an Insert immediately | |
| // followed by a Get will take longer to process - however, | |
| // this also means that failed writes will disappear from the | |
| // cache, which is A Good Thing. | |
| workItem | |
| .Finally(() => { if (fd != null) fd.Dispose(); }) | |
| .Multicast(ret).Connect(); | |
| return ret; | |
| } | |
| string GetPathForKey(string key) | |
| { | |
| return Path.Combine(_cacheDirectory, CoreUtility.GetMd5Hash(key)); | |
| } | |
| static string GetDefaultCacheDirectory() | |
| { | |
| return RxApp.InUnitTestRunner() ? | |
| Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "BlobCache") : | |
| Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), "GitHub", "BlobCache"); | |
| } | |
| } | |
| } | |
| namespace GitHub.Tests.Helpers | |
| { | |
| public class BlobCacheFixture | |
| { | |
| [Test] | |
| public void CacheShouldBeAbleToGetAndInsertBlobs() | |
| { | |
| (Scheduler.CurrentThread).With(sched => | |
| { | |
| var fixture = new PersistentBlobCache(); | |
| fixture.Insert("Foo", new byte[] { 1, 2, 3 }); | |
| fixture.Insert("Bar", new byte[] { 4, 5, 6 }); | |
| Assert.Throws<ArgumentNullException>(() => | |
| fixture.Insert(null, new byte[] { 7, 8, 9 })); | |
| byte[] output1 = fixture.GetAsync("Foo").First(); | |
| byte[] output2 = fixture.GetAsync("Bar").First(); | |
| Assert.Throws<ArgumentNullException>(() => | |
| fixture.GetAsync(null).First()); | |
| Assert.Throws<KeyNotFoundException>(() => | |
| fixture.GetAsync("Baz").First()); | |
| Assert.AreEqual(3, output1.Length); | |
| Assert.AreEqual(3, output2.Length); | |
| Assert.AreEqual(1, output1[0]); | |
| Assert.AreEqual(4, output2[0]); | |
| }); | |
| } | |
| [Test] | |
| public void CacheShouldBeRoundtrippable() | |
| { | |
| var testDir = Path.Combine( | |
| Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), | |
| "PBCacheTest"); | |
| if (Directory.Exists(testDir)) | |
| { | |
| Fixtures.DeleteDirectory(testDir); | |
| } | |
| using(var fixture = new PersistentBlobCache(testDir)) | |
| { | |
| fixture.Insert("Foo", new byte[] { 1, 2, 3 }); | |
| } | |
| using(var fixture = new PersistentBlobCache(testDir)) | |
| { | |
| var output = fixture.GetAsync("Foo").First(); | |
| Assert.AreEqual(3, output.Length); | |
| Assert.AreEqual(1, output[0]); | |
| } | |
| Fixtures.DeleteDirectory(testDir); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment