Created
June 3, 2016 18:30
-
-
Save TheCloudlessSky/7c60dd6657caea79dc5764ee8779df9e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Collections.Generic; | |
using System.Globalization; | |
using System.Threading.Tasks; | |
using NHibernate.Cache; | |
using NHibernate.Util; | |
using System.Net.Sockets; | |
using StackExchange.Redis; | |
using System.Runtime.Caching; | |
namespace NHibernate.Caches.Redis | |
{ | |
public class RedisCache : ICache | |
{ | |
private const string cacheNamespacePrefix = "NHibernate-Cache:"; | |
private static readonly IInternalLogger log = LoggerProvider.LoggerFor(typeof(RedisCache)); | |
// The acquired locks do not need to be distributed into Redis because | |
// the same ISession will lock/unlock an object. | |
private readonly MemoryCache acquiredLocks = new MemoryCache("NHibernate.Caches.Redis.RedisCache"); | |
private static readonly LuaScript getScript = LuaScript.Prepare(@" | |
if redis.call('sismember', @setOfActiveKeysKey, @key) == 1 then | |
local result = redis.call('get', @key) | |
if not result then | |
redis.call('srem', @setOfActiveKeysKey, @key) | |
end | |
return result | |
else | |
redis.call('del', @key) | |
return nil | |
end | |
"); | |
private static readonly LuaScript slidingExpirationScript = LuaScript.Prepare(@" | |
local pttl = redis.call('pttl', @key) | |
if pttl <= tonumber(@slidingExpiration) then | |
redis.call('pexpire', @key, @expiration) | |
return true | |
else | |
return false | |
end | |
"); | |
private static readonly LuaScript putScript = LuaScript.Prepare(@" | |
redis.call('sadd', @setOfActiveKeysKey, @key) | |
redis.call('set', @key, @value, 'PX', @expiration) | |
"); | |
private static readonly LuaScript removeScript = LuaScript.Prepare(@" | |
redis.call('srem', @setOfActiveKeysKey, @key) | |
redis.call('del', @key) | |
"); | |
private LuaScript unlockScript = LuaScript.Prepare(@" | |
if redis.call('get', @lockKey) == @lockValue then | |
return redis.call('del', @lockKey) | |
else | |
return 0 | |
end | |
"); | |
// Help with debugging scripts since exceptions are swallowed with FireAndForget. | |
//#if DEBUG | |
private const CommandFlags fireAndForgetFlags = CommandFlags.None; | |
//#else | |
// private const CommandFlags fireAndForgetFlags = CommandFlags.FireAndForget; | |
//#endif | |
private readonly bool useIndex; | |
private readonly ConnectionMultiplexer connectionMultiplexer; | |
private readonly RedisCacheProviderOptions options; | |
private readonly TimeSpan expiration; | |
private readonly TimeSpan slidingExpiration; | |
private readonly TimeSpan lockTimeout; | |
private readonly TimeSpan acquireLockTimeout; | |
public string RegionName { get; private set; } | |
internal RedisNamespace CacheNamespace { get; private set; } | |
public int Timeout { get { return Timestamper.OneMs * (int)lockTimeout.TotalMilliseconds; } } | |
private class LockData | |
{ | |
public string Key { get; private set; } | |
public string LockKey { get; private set; } | |
public string LockValue { get; private set; } | |
public LockData(string key, string lockKey, string lockValue) | |
{ | |
this.Key = key; | |
this.LockKey = lockKey; | |
this.LockValue = lockValue; | |
} | |
public override string ToString() | |
{ | |
return "{ Key='" + Key + "', LockKey='" + LockKey + "', LockValue='" + LockValue + "' }"; | |
} | |
} | |
public RedisCache(string regionName, ConnectionMultiplexer connectionMultiplexer, RedisCacheProviderOptions options, bool useIndex = true) | |
: this(new RedisCacheConfiguration(regionName), connectionMultiplexer, options, useIndex) | |
{ | |
} | |
public RedisCache(RedisCacheConfiguration configuration, ConnectionMultiplexer connectionMultiplexer, RedisCacheProviderOptions options, bool useIndex = true) | |
{ | |
configuration.ThrowIfNull("configuration") | |
.Validate(); | |
RegionName = configuration.RegionName; | |
expiration = configuration.Expiration; | |
slidingExpiration = configuration.SlidingExpiration; | |
lockTimeout = configuration.LockTimeout; | |
acquireLockTimeout = configuration.AcquireLockTimeout; | |
this.connectionMultiplexer = connectionMultiplexer.ThrowIfNull("connectionMultiplexer"); | |
this.options = options.ThrowIfNull("options") | |
.ShallowCloneAndValidate(); | |
this.useIndex = useIndex; | |
log.DebugFormat("creating cache: regionName='{0}', expiration='{1}', lockTimeout='{2}', acquireLockTimeout='{3}'", | |
RegionName, expiration, lockTimeout, acquireLockTimeout | |
); | |
CacheNamespace = new RedisNamespace(cacheNamespacePrefix + RegionName); | |
} | |
public long NextTimestamp() | |
{ | |
return Timestamper.Next(); | |
} | |
public virtual void Put(object key, object value) | |
{ | |
key.ThrowIfNull("key"); | |
value.ThrowIfNull("value"); | |
log.DebugFormat("put in cache: regionName='{0}', key='{1}'", RegionName, key); | |
try | |
{ | |
var serializedValue = options.Serializer.Serialize(value); | |
var cacheKey = CacheNamespace.GetKey(key); | |
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey(); | |
var db = GetDatabase(); | |
if (useIndex) | |
{ | |
db.ScriptEvaluate(putScript, new | |
{ | |
key = cacheKey, | |
setOfActiveKeysKey = setOfActiveKeysKey, | |
value = serializedValue, | |
expiration = expiration.TotalMilliseconds | |
}, fireAndForgetFlags); | |
} | |
else | |
{ | |
db.StringSet(cacheKey, serializedValue, expiration, flags: fireAndForgetFlags); | |
} | |
} | |
catch (Exception e) | |
{ | |
log.ErrorFormat("could not put in cache: regionName='{0}', key='{1}'", RegionName, key); | |
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Put, e); | |
options.OnException(this, evtArg); | |
if (evtArg.Throw) | |
{ | |
throw new RedisCacheException(RegionName, "Failed to put item in cache. See inner exception.", e); | |
} | |
} | |
} | |
public virtual object Get(object key) | |
{ | |
key.ThrowIfNull(); | |
log.DebugFormat("get from cache: regionName='{0}', key='{1}'", RegionName, key); | |
try | |
{ | |
var cacheKey = CacheNamespace.GetKey(key); | |
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey(); | |
var db = GetDatabase(); | |
RedisValue[] resultValues; | |
if (useIndex) | |
{ | |
resultValues = (RedisValue[])db.ScriptEvaluate(getScript, new | |
{ | |
key = cacheKey, | |
setOfActiveKeysKey = setOfActiveKeysKey | |
}); | |
} | |
else | |
{ | |
resultValues = new[] { db.StringGet(cacheKey, fireAndForgetFlags) }; | |
} | |
if (resultValues[0].IsNullOrEmpty) | |
{ | |
log.DebugFormat("cache miss: regionName='{0}', key='{1}'", RegionName, key); | |
return null; | |
} | |
else | |
{ | |
var serializedResult = resultValues[0]; | |
var deserializedValue = options.Serializer.Deserialize(serializedResult); | |
if (deserializedValue != null && slidingExpiration != RedisCacheConfiguration.NoSlidingExpiration) | |
{ | |
db.ScriptEvaluate(slidingExpirationScript, new | |
{ | |
key = cacheKey, | |
expiration = expiration.TotalMilliseconds, | |
slidingExpiration = slidingExpiration.TotalMilliseconds | |
}, fireAndForgetFlags); | |
} | |
return deserializedValue; | |
} | |
} | |
catch (Exception e) | |
{ | |
log.ErrorFormat("could not get from cache: regionName='{0}', key='{1}'", RegionName, key); | |
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Get, e); | |
options.OnException(this, evtArg); | |
if (evtArg.Throw) | |
{ | |
throw new RedisCacheException(RegionName, "Failed to get item from cache. See inner exception.", e); | |
} | |
return null; | |
} | |
} | |
public virtual void Remove(object key) | |
{ | |
key.ThrowIfNull(); | |
log.DebugFormat("remove from cache: regionName='{0}', key='{1}'", RegionName, key); | |
try | |
{ | |
var cacheKey = CacheNamespace.GetKey(key); | |
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey(); | |
var db = GetDatabase(); | |
if (useIndex) | |
{ | |
db.ScriptEvaluate(removeScript, new | |
{ | |
key = cacheKey, | |
setOfActiveKeysKey = setOfActiveKeysKey | |
}, fireAndForgetFlags); | |
} | |
else | |
{ | |
db.KeyDelete(cacheKey, fireAndForgetFlags); | |
} | |
} | |
catch (Exception e) | |
{ | |
log.ErrorFormat("could not remove from cache: regionName='{0}', key='{1}'", RegionName, key); | |
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Remove, e); | |
options.OnException(this, evtArg); | |
if (evtArg.Throw) | |
{ | |
throw new RedisCacheException(RegionName, "Failed to remove item from cache. See inner exception.", e); | |
} | |
} | |
} | |
public virtual void Clear() | |
{ | |
log.DebugFormat("clear cache: regionName='{0}'", RegionName); | |
try | |
{ | |
var db = GetDatabase(); | |
if (useIndex) | |
{ | |
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey(); | |
db.KeyDelete(setOfActiveKeysKey, fireAndForgetFlags); | |
} | |
else | |
{ | |
var scanServer = connectionMultiplexer.GetEndPoints() | |
.Select(x => connectionMultiplexer.GetServer(x)) | |
.Where(x => x.IsConnected) | |
.OrderByDescending(x => x.IsSlave) | |
.First(); | |
var pattern = CacheNamespace.GetKey("") + "*"; | |
var keysToDelete = scanServer.Keys(options.Database, pattern, pageSize: 1000).ToArray(); | |
db.KeyDelete(keysToDelete); | |
} | |
} | |
catch (Exception e) | |
{ | |
log.ErrorFormat("could not clear cache: regionName='{0}'", RegionName); | |
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Clear, e); | |
options.OnException(this, evtArg); | |
if (evtArg.Throw) | |
{ | |
throw new RedisCacheException(RegionName, "Failed to clear cache. See inner exception.", e); | |
} | |
} | |
} | |
public virtual void Destroy() | |
{ | |
// No-op since Redis is distributed. | |
log.DebugFormat("destroying cache: regionName='{0}'", RegionName); | |
} | |
public virtual void Lock(object key) | |
{ | |
log.DebugFormat("acquiring cache lock: regionName='{0}', key='{1}'", RegionName, key); | |
try | |
{ | |
var lockKey = CacheNamespace.GetLockKey(key); | |
var shouldRetry = options.AcquireLockRetryStrategy.GetShouldRetry(); | |
var wasLockAcquired = false; | |
var shouldTryAcquireLock = true; | |
while (shouldTryAcquireLock) | |
{ | |
var lockData = new LockData( | |
key: Convert.ToString(key), | |
lockKey: lockKey, | |
// Recalculated each attempt to ensure a unique value. | |
lockValue: options.LockValueFactory.GetLockValue() | |
); | |
if (TryAcquireLock(lockData)) | |
{ | |
wasLockAcquired = true; | |
shouldTryAcquireLock = false; | |
} | |
else | |
{ | |
var shouldRetryArgs = new ShouldRetryAcquireLockArgs( | |
RegionName, lockData.Key, lockData.LockKey, | |
lockData.LockValue, lockTimeout, acquireLockTimeout | |
); | |
shouldTryAcquireLock = shouldRetry(shouldRetryArgs); | |
} | |
} | |
if (!wasLockAcquired) | |
{ | |
var lockFailedArgs = new LockFailedEventArgs( | |
RegionName, key, lockKey, | |
lockTimeout, acquireLockTimeout | |
); | |
options.OnLockFailed(this, lockFailedArgs); | |
} | |
} | |
catch (Exception e) | |
{ | |
log.ErrorFormat("could not acquire cache lock: regionName='{0}', key='{1}'", RegionName, key); | |
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Lock, e); | |
options.OnException(this, evtArg); | |
if (evtArg.Throw) | |
{ | |
throw new RedisCacheException(RegionName, "Failed to lock item in cache. See inner exception.", e); | |
} | |
} | |
} | |
private bool TryAcquireLock(LockData lockData) | |
{ | |
var db = GetDatabase(); | |
// Don't use IDatabase.LockTake() because we don't use the matching | |
// LockRelease(). So, avoid any confusion. Besides, LockTake() just | |
// calls this anyways. | |
var wasLockAcquired = db.StringSet(lockData.LockKey, lockData.LockValue, lockTimeout, When.NotExists); | |
if (wasLockAcquired) | |
{ | |
// It's ok to use Set() instead of Add() because the lock in | |
// Redis will cause other clients to wait. | |
acquiredLocks.Set(lockData.Key, lockData, absoluteExpiration: DateTime.UtcNow.Add(lockTimeout)); | |
} | |
return wasLockAcquired; | |
} | |
public virtual void Unlock(object key) | |
{ | |
// Use Remove() instead of Get() because we are releasing the lock | |
// anyways. | |
var lockData = acquiredLocks.Remove(Convert.ToString(key)) as LockData; | |
if (lockData == null) | |
{ | |
log.WarnFormat("attempted to unlock '{0}' but a previous lock was not acquired or timed out", key); | |
var unlockFailedEventArgs = new UnlockFailedEventArgs( | |
RegionName, key, lockKey: null, lockValue: null | |
); | |
options.OnUnlockFailed(this, unlockFailedEventArgs); | |
return; | |
} | |
log.DebugFormat("releasing cache lock: regionName='{0}', key='{1}', lockKey='{2}', lockValue='{3}'", | |
RegionName, lockData.Key, lockData.LockKey, lockData.LockValue | |
); | |
try | |
{ | |
var db = GetDatabase(); | |
// Don't use IDatabase.LockRelease() because it uses watch/unwatch | |
// where we prefer an atomic operation (via a script). | |
var wasLockReleased = (bool)db.ScriptEvaluate(unlockScript, new | |
{ | |
lockKey = lockData.LockKey, | |
lockValue = lockData.LockValue | |
}); | |
if (!wasLockReleased) | |
{ | |
log.WarnFormat("attempted to unlock '{0}' but it could not be released (it maybe timed out or was cleared in Redis)", lockData); | |
var unlockFailedEventArgs = new UnlockFailedEventArgs( | |
RegionName, key, lockData.LockKey, lockData.LockValue | |
); | |
options.OnUnlockFailed(this, unlockFailedEventArgs); | |
} | |
} | |
catch (Exception e) | |
{ | |
log.ErrorFormat("could not release cache lock: regionName='{0}', key='{1}', lockKey='{2}', lockValue='{3}'", | |
RegionName, lockData.Key, lockData.LockKey, lockData.LockValue | |
); | |
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Unlock, e); | |
options.OnException(this, evtArg); | |
if (evtArg.Throw) | |
{ | |
throw new RedisCacheException(RegionName, "Failed to unlock item in cache. See inner exception.", e); | |
} | |
} | |
} | |
private IDatabase GetDatabase() | |
{ | |
return connectionMultiplexer.GetDatabase(options.Database); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NHibernate.Cache; | |
using StackExchange.Redis; | |
using Xunit; | |
namespace NHibernate.Caches.Redis.Tests | |
{ | |
public class RedisCacheLoadTests | |
{ | |
private const int testDb = 15; | |
private const string testHost = "localhost"; | |
private const int slowOperationThresholdMilliseconds = 10; | |
private const int numberOfKeys = 500000; | |
private readonly ConcurrentQueue<string> logQueue = new ConcurrentQueue<string>(); | |
private static ConnectionMultiplexer CreateConnectionMultiplexer() | |
{ | |
int testPort = 6379; | |
string connectionString = testHost + ":" + testPort + ",allowAdmin=true,abortConnect=false,syncTimeout=5000"; | |
return ConnectionMultiplexer.Connect(connectionString); | |
} | |
private void Log(string message) | |
{ | |
logQueue.Enqueue(message); | |
} | |
[Fact] | |
void has_index() | |
{ | |
Run(useIndex: true); | |
} | |
[Fact] | |
void no_index() | |
{ | |
Run(useIndex: false); | |
} | |
private void Run(bool useIndex) | |
{ | |
var mux = CreateConnectionMultiplexer(); | |
var masterServer = mux.GetServer(mux.GetEndPoints().First()); | |
masterServer.FlushDatabase(testDb); | |
var cache1 = new RedisCache("region1", mux, new RedisCacheProviderOptions() | |
{ | |
Database = testDb, | |
CacheConfigurations = new[] { new RedisCacheConfiguration("region1") { Expiration = TimeSpan.FromSeconds(1) } } | |
}, useIndex); | |
var cache2 = new RedisCache("region2", mux, new RedisCacheProviderOptions() | |
{ | |
Database = testDb, | |
CacheConfigurations = new[] { new RedisCacheConfiguration("region2") { Expiration = TimeSpan.FromSeconds(60) } } | |
}, useIndex); | |
Setup(cache1, numberOfKeys); | |
var infoKeyspace = masterServer.Info("Keyspace"); | |
Log($"Keyspace: {infoKeyspace.FirstOrDefault(x => x.Key == "Keyspace")?.FirstOrDefault(x => x.Key == "db" + testDb)}"); | |
var taskReadWrite1_A = Task.Run(() => CacheReadWrite(cache1, duration: TimeSpan.FromSeconds(5))); | |
var taskReadWrite1_B = Task.Run(() => CacheReadWrite(cache1, duration: TimeSpan.FromSeconds(5))); | |
var taskReadWrite2_A = Task.Run(() => CacheReadWrite(cache2, duration: TimeSpan.FromSeconds(5))); | |
var taskReadWrite2_B = Task.Run(() => CacheReadWrite(cache2, duration: TimeSpan.FromSeconds(5))); | |
var taskClear = Task.Run(() => CacheClear(cache1, delay: TimeSpan.FromSeconds(1))); | |
Task.WaitAll(taskReadWrite1_A, taskReadWrite1_B, taskReadWrite2_A, taskReadWrite2_B, taskClear); | |
Console.WriteLine(String.Join("\n", logQueue)); | |
} | |
private void Setup(ICache cache, int numberOfKeys) | |
{ | |
var sw = Stopwatch.StartNew(); | |
for (var i = 0; i < numberOfKeys; i++) | |
{ | |
var key = Guid.NewGuid(); | |
var value = Guid.NewGuid(); | |
cache.Put(key, value); | |
} | |
sw.Stop(); | |
Log($"Setup took {sw.Elapsed.TotalSeconds:F}s"); | |
} | |
private void CacheReadWrite(ICache cache, TimeSpan duration) | |
{ | |
var sw = Stopwatch.StartNew(); | |
while (sw.Elapsed < duration) | |
{ | |
var key = Guid.NewGuid(); | |
var value = key; | |
var opTime = Stopwatch.StartNew(); | |
cache.Get(key); | |
opTime.Stop(); | |
if (opTime.ElapsedMilliseconds > slowOperationThresholdMilliseconds) | |
{ | |
Log($"An ICache.Get for '{cache.RegionName}' was slow ({opTime.Elapsed.TotalMilliseconds:F}ms)"); | |
} | |
opTime.Restart(); | |
cache.Put(key, value); | |
opTime.Stop(); | |
if (opTime.ElapsedMilliseconds > slowOperationThresholdMilliseconds) | |
{ | |
Log($"An ICache.Put for '{cache.RegionName}' was slow ({opTime.Elapsed.TotalMilliseconds:F}ms)"); | |
} | |
} | |
sw.Stop(); | |
Log($"ReadWrite for {cache.RegionName} took {sw.Elapsed.TotalSeconds:F}s"); | |
} | |
private void CacheClear(ICache cache, TimeSpan delay) | |
{ | |
Thread.Sleep(delay); | |
Log("Starting clear"); | |
var sw = Stopwatch.StartNew(); | |
cache.Clear(); | |
sw.Stop(); | |
Log($"Clear took {sw.Elapsed.TotalMilliseconds:F}ms"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment