Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created November 13, 2011 02:08
Show Gist options
  • Select an option

  • Save anaisbetts/1361481 to your computer and use it in GitHub Desktop.

Select an option

Save anaisbetts/1361481 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.IO;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reflection;
using ReactiveUI;
namespace GitHub.Core
{
/// <summary>
/// This class represents an asynchronous key-value store backed by a
/// directory. It stores the last 'n' key requests in memory.
/// </summary>
public class PersistentBlobCache : IEnableLogger, IDisposable
{
MemoizingMRUCache<string, AsyncSubject<byte[]>> _memoizedRequests;
readonly IScheduler _scheduler;
readonly string _cacheDirectory;
public PersistentBlobCache(string cacheDirectory = null, IScheduler scheduler = null)
{
_cacheDirectory = cacheDirectory ?? GetDefaultCacheDirectory();
_scheduler = scheduler ?? RxApp.TaskpoolScheduler;
// Here, we're not actually caching the requests directly (i.e. as
// byte[]s), but as the "replayed result of the request", in the
// AsyncSubject - this makes the code infinitely simpler because
// we don't have to keep a separate list of "in-flight reads" vs
// "already completed and cached reads"
_memoizedRequests = new MemoizingMRUCache<string, AsyncSubject<byte[]>>(
FetchOrWriteBlobFromDisk, 20);
if (!Directory.Exists(_cacheDirectory))
{
(new DirectoryInfo(_cacheDirectory)).CreateRecursive();
}
}
public void Insert(string key, byte[] data)
{
// NB: Since FetchOrWriteBlobFromDisk is guaranteed to not block,
// we never sit on this lock for any real length of time
lock(this)
{
var err = _memoizedRequests.Get(key, data);
// If we fail trying to fetch/write the key on disk, we want to
// try again instead of replaying the same failure
err.Subscribe(x => {},
ex => _memoizedRequests.Invalidate(key));
}
}
public IObservable<byte[]> GetAsync(string key)
{
lock(this)
{
// There are three scenarios here, and we handle all of them
// with aplomb and elegance:
//
// 1. The key is already in memory as a completed request - we
// return the AsyncSubject which will replay the result
//
// 2. The key is currently being fetched from disk - in this
// case, MemoizingMRUCache has an AsyncSubject for it (since
// FetchOrWriteBlobFromDisk completes immediately), and the
// client will get the result when the disk I/O completes
//
// 3. The key isn't in memory and isn't being fetched - in
// this case, FetchOrWriteBlobFromDisk will be called which
// will immediately return an AsyncSubject representing the
// queued disk read.
var ret = _memoizedRequests.Get(key);
// If we fail trying to fetch/write the key on disk, we want to
// try again instead of replaying the same failure
ret.Subscribe(x => {},
ex => _memoizedRequests.Invalidate(key));
return ret;
}
}
public void Dispose()
{
// We need to make sure that all outstanding writes are flushed
// before we bail
lock(this)
{
// Since these are all AsyncSubjects, most of them will replay
// immediately, except for the ones still outstanding; we'll
// Merge them all then wait for them all to complete.
_memoizedRequests.CachedValues()
.Merge()
.Timeout(TimeSpan.FromSeconds(30), _scheduler)
.Wait();
_memoizedRequests = null;
}
}
AsyncSubject<byte[]> FetchOrWriteBlobFromDisk(string key, object byteData)
{
var ret = new AsyncSubject<byte[]>();
// If this is secretly a write, dispatch to WriteBlobToDisk (we're
// kind of abusing the 'context' variable from MemoizingMRUCache
// here a bit)
if (byteData != null)
{
return WriteBlobToDisk(key, (byte[]) byteData);
}
FileStream fd = null;
var ms = new MemoryStream();
var workItem = Observable.Start(() =>
{
fd = File.OpenRead(GetPathForKey(key));
fd.CopyTo(ms);
return ms.ToArray();
}, _scheduler);
workItem
.Catch<byte[], FileNotFoundException>(ex => Observable.Throw<byte[]>(new KeyNotFoundException()))
.Finally(() => { if (fd != null) fd.Dispose(); })
.Multicast(ret).Connect();
return ret;
}
AsyncSubject<byte[]> WriteBlobToDisk(string key, byte[] byteData)
{
var ret = new AsyncSubject<byte[]>();
var ms = new MemoryStream(byteData);
FileStream fd = null;
var workItem = Observable.Start(() =>
{
fd = File.OpenWrite(GetPathForKey(key));
ms.CopyTo(fd);
return byteData;
}, _scheduler);
// NB: The fact that our writing AsyncSubject waits until the
// write actually completes means that an Insert immediately
// followed by a Get will take longer to process - however,
// this also means that failed writes will disappear from the
// cache, which is A Good Thing.
workItem
.Finally(() => { if (fd != null) fd.Dispose(); })
.Multicast(ret).Connect();
return ret;
}
string GetPathForKey(string key)
{
return Path.Combine(_cacheDirectory, CoreUtility.GetMd5Hash(key));
}
static string GetDefaultCacheDirectory()
{
return RxApp.InUnitTestRunner() ?
Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "BlobCache") :
Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), "GitHub", "BlobCache");
}
}
}
namespace GitHub.Tests.Helpers
{
public class BlobCacheFixture
{
[Test]
public void CacheShouldBeAbleToGetAndInsertBlobs()
{
(Scheduler.CurrentThread).With(sched =>
{
var fixture = new PersistentBlobCache();
fixture.Insert("Foo", new byte[] { 1, 2, 3 });
fixture.Insert("Bar", new byte[] { 4, 5, 6 });
Assert.Throws<ArgumentNullException>(() =>
fixture.Insert(null, new byte[] { 7, 8, 9 }));
byte[] output1 = fixture.GetAsync("Foo").First();
byte[] output2 = fixture.GetAsync("Bar").First();
Assert.Throws<ArgumentNullException>(() =>
fixture.GetAsync(null).First());
Assert.Throws<KeyNotFoundException>(() =>
fixture.GetAsync("Baz").First());
Assert.AreEqual(3, output1.Length);
Assert.AreEqual(3, output2.Length);
Assert.AreEqual(1, output1[0]);
Assert.AreEqual(4, output2[0]);
});
}
[Test]
public void CacheShouldBeRoundtrippable()
{
var testDir = Path.Combine(
Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location),
"PBCacheTest");
if (Directory.Exists(testDir))
{
Fixtures.DeleteDirectory(testDir);
}
using(var fixture = new PersistentBlobCache(testDir))
{
fixture.Insert("Foo", new byte[] { 1, 2, 3 });
}
using(var fixture = new PersistentBlobCache(testDir))
{
var output = fixture.GetAsync("Foo").First();
Assert.AreEqual(3, output.Length);
Assert.AreEqual(1, output[0]);
}
Fixtures.DeleteDirectory(testDir);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment