Created
September 6, 2012 06:27
-
-
Save rpgmaker/3652097 to your computer and use it in GitHub Desktop.
Redis Provider
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Text.RegularExpressions; | |
using System.Runtime.Serialization.Formatters.Binary; | |
using System.IO; | |
using PServiceBus.Core.Runtime; | |
using PServiceBus.Core.Logger; | |
using ServiceStack.Redis; | |
using ServiceStack.Redis.Generic; | |
using PServiceBus.Core.Interface; | |
using PServiceBus.Core.Runtime.Extensions; | |
using System.Collections; | |
using System.Threading.Tasks; | |
using System.Collections.Concurrent; | |
namespace PServiceBus.Redis { | |
public sealed class Provider<TObject> : IHaveEndpoint, IObjectProvider<TObject> where TObject : class { | |
private static Regex _regexp = new Regex(@"(?<HostName>[a-zA-Z0-9 _ .]+)(?:\:)?(?<Port>\d+);UserID=(?<UserID>[a-zA-Z0-9 _ =]+)?;Password=(?<Password>[a-zA-Z0-9 _ =]+)?(?:;Queue=)?(?<Queue>[a-zA-Z0-9 _ =]+)?(?:;)?", | |
RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.Multiline); | |
private object _lockAddObject = new object(); | |
private object _lockRetrieveObject = new object(); | |
private object _lockReadOnlyObject = new object(); | |
private ConcurrentDictionary<string, RedisNativeClient> _clients = new ConcurrentDictionary<string, RedisNativeClient>(); | |
private static ConcurrentDictionary<string, object> _providers = new ConcurrentDictionary<string, object>(); | |
public static Provider<TObject> GetProvider(string endpoint, bool appendTypeNameToEndpoint = false, string endpointAlias = null) { | |
return _providers.GetOrAdd(endpoint, key => | |
new Provider<TObject> | |
{ | |
Endpoint = endpoint, | |
IDFunc = x => Guid.NewGuid(), | |
EndpointAlias = endpointAlias, | |
AppendTypeNameToEndpoint = appendTypeNameToEndpoint | |
}) as Provider<TObject>; | |
} | |
private void Query(Action<RedisNativeClient> action) { | |
try { | |
lock (_lockAddObject) { | |
var client = GetConnection("Add"); | |
action(client); | |
} | |
} catch (Exception ex) { | |
ESBLogger.Log(ex); | |
} | |
} | |
private TReturn Query<TReturn>(Func<RedisNativeClient, TReturn> func) { | |
var result = default(TReturn); | |
try { | |
lock (_lockRetrieveObject) { | |
result = func(GetConnection("Get")); | |
} | |
} catch (Exception ex) { | |
ESBLogger.Log(ex); | |
} | |
return result; | |
} | |
internal class ProviderFactory { | |
public string Queue { get; set; } | |
public int Port { get; set; } | |
public string HostName { get; set; } | |
} | |
private bool _containQueueName = false; | |
private ProviderFactory GetConnectionFactory() { | |
if (!_containQueueName) { | |
Endpoint = (AppendTypeNameToEndpoint ? String.Format("{0};queue={1}", Endpoint, | |
typeof(TObject).FullName.Replace(".", "_")) : Endpoint) + (EndpointAlias ?? string.Empty); | |
_containQueueName = true; | |
} | |
var regMatch = _regexp.Match(Endpoint); | |
var hostName = regMatch.Groups["HostName"].Value; | |
var port = Convert.ToInt32(regMatch.Groups["Port"].Value); | |
var queue = regMatch.Groups["Queue"].Value; | |
return new ProviderFactory() { Queue = queue, HostName = hostName, Port = port }; | |
} | |
private ProviderFactory _provider = null; | |
private ProviderFactory RedisInfo { | |
get { | |
if (_provider == null) | |
_provider = GetConnectionFactory(); | |
return _provider; | |
} | |
} | |
private RedisNativeClient GetRedisConnection() { | |
var info = RedisInfo; | |
return new RedisNativeClient(info.HostName, info.Port); | |
} | |
private RedisNativeClient GetConnection(string name) { | |
var client = default(RedisNativeClient); | |
var info = RedisInfo; | |
if (!_clients.TryGetValue(name, out client)) { | |
client = _clients[name] = GetRedisConnection(); | |
} | |
if (client.HadExceptions){ | |
MethodHelper.Try(() => client.Dispose()); | |
client = _clients[name] = GetRedisConnection(); | |
} | |
return client; | |
} | |
private DateTime _lastSave = DateTime.Now; | |
private void SaveAsync() { | |
var client = GetConnection("Persist"); | |
MethodHelper.Try(() => | |
{ | |
var lastSave = client.LastSave; | |
if (lastSave > _lastSave) { | |
Task.Factory.StartNew(o => | |
{ | |
var redis = o as RedisNativeClient; | |
MethodHelper.Try(() => redis.BgSave()); | |
}, client); | |
_lastSave = lastSave; | |
} | |
}); | |
} | |
public void SaveSync() { | |
var client = GetConnection("Persist"); | |
client.Save(); | |
} | |
public void Clear() { | |
Query(client => | |
{ | |
Delete(this.ToList()); | |
}); | |
} | |
public void Delete() { | |
Clear(); | |
} | |
private byte[] GetID(TObject obj) { | |
return IDFunc(obj).ToByteArray(); | |
} | |
public bool Refresh(TObject obj) { | |
return Add(obj); | |
} | |
public bool Exists(TObject obj) { | |
return Query(client => client.HExists(RedisInfo.Queue, GetID(obj)) == 1); | |
} | |
public ulong Incr(string key) { | |
return Query(client => (ulong)client.Incr(key)); | |
} | |
public ulong Decr(string key) { | |
return Query(client => (ulong)client.Decr(key)); | |
} | |
public TValue Get<TValue>(string key) { | |
return Query(client => { | |
var buffer = client.Get(key); | |
return (TValue)Convert.ChangeType(buffer == null ? "0" : buffer.UTF8ByteToString(), | |
typeof(TValue)); | |
}); | |
} | |
public void Set<TValue>(string key, TValue value) { | |
Query(client => client.Set(key, value.ToString().ToUTF8Bytes())); | |
} | |
public TObject Get(Guid id) { | |
return Query(client => | |
{ | |
var data = client.HGet(RedisInfo.Queue, id.ToByteArray()); | |
return data != null ? data.Deserialize<TObject>() : null; | |
}); | |
} | |
public bool Add(TObject obj) { | |
var success = false; | |
Query(client => | |
{ | |
client.HSet(RedisInfo.Queue, GetID(obj), obj.Serialize()); | |
success = true; | |
}); | |
return success; | |
} | |
public bool Add(IEnumerable<TObject> list) { | |
var success = false; | |
Query(client => | |
{ | |
var count = list.Count(); | |
if (count > 0) { | |
var index = 0; | |
var keys = new byte[count][]; | |
var values = new byte[count][]; | |
foreach (var obj in list) { | |
keys[index] = GetID(obj); | |
values[index] = obj.Serialize(); | |
index++; | |
} | |
client.HMSet(RedisInfo.Queue, keys, values); | |
} | |
success = true; | |
}); | |
return success; | |
} | |
public bool Delete(IEnumerable<TObject> list) { | |
return Query<bool>(client => | |
{ | |
foreach (var obj in list) | |
client.HDel(RedisInfo.Queue, GetID(obj)); | |
return true; | |
}); | |
} | |
public bool Exists(Func<TObject, bool> condition) { | |
return Get(condition) != null; | |
} | |
public TObject Get(Func<TObject, bool> condition) { | |
return this.FirstOrDefault(d => d != null && condition(d)); | |
} | |
public bool Delete(Func<TObject, bool> condition) { | |
var objs = this.Where(d => d != null && condition(d)); | |
return Delete(objs); | |
} | |
public bool Delete(TObject obj) { | |
return Query(client => { | |
var success = client.HDel(RedisInfo.Queue, GetID(obj)) == 1; | |
return success; | |
}); | |
} | |
#region IObjectProvider<TObject> Members | |
public bool Delete(IEnumerable<Func<TObject, bool>> conditions) { | |
var objs = this.Where(d => d != null && conditions.Any(c => c(d))).ToList(); | |
return Delete(objs); | |
} | |
public string Endpoint { get; set; } | |
public string EndpointAlias { get; set; } | |
public bool AppendTypeNameToEndpoint { get; set; } | |
public bool DeleteObjectOnRead { get; set; } | |
public Func<TObject, Guid> IDFunc { get; set; } | |
#endregion | |
#region IEnumerable<TObject> Members | |
public IEnumerator<TObject> GetEnumerator() { | |
lock (_lockReadOnlyObject) { | |
var client = GetConnection("ReadOnly"); | |
var queue = RedisInfo.Queue; | |
var values = client.HGetAll(queue); | |
for (var i = 0; i < values.Length; i += 2) { | |
var key = values[i]; | |
var value = values[i + 1]; | |
if (DeleteObjectOnRead) { | |
client.HDel(queue, key); | |
} | |
yield return value.Deserialize<TObject>(); | |
} | |
} | |
} | |
#endregion | |
#region IEnumerable Members | |
IEnumerator IEnumerable.GetEnumerator() { | |
return GetEnumerator(); | |
} | |
#endregion | |
#region IDisposable Members | |
public void Dispose() { | |
foreach (var client in _clients) | |
MethodHelper.Try(client.Value.Dispose); | |
_clients.Clear(); | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment