Skip to content

Instantly share code, notes, and snippets.

@rpgmaker
Created September 6, 2012 06:27
Show Gist options
  • Save rpgmaker/3652097 to your computer and use it in GitHub Desktop.
Save rpgmaker/3652097 to your computer and use it in GitHub Desktop.
Redis Provider
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using PServiceBus.Core.Runtime;
using PServiceBus.Core.Logger;
using ServiceStack.Redis;
using ServiceStack.Redis.Generic;
using PServiceBus.Core.Interface;
using PServiceBus.Core.Runtime.Extensions;
using System.Collections;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace PServiceBus.Redis {
public sealed class Provider<TObject> : IHaveEndpoint, IObjectProvider<TObject> where TObject : class {
private static Regex _regexp = new Regex(@"(?<HostName>[a-zA-Z0-9 _ .]+)(?:\:)?(?<Port>\d+);UserID=(?<UserID>[a-zA-Z0-9 _ =]+)?;Password=(?<Password>[a-zA-Z0-9 _ =]+)?(?:;Queue=)?(?<Queue>[a-zA-Z0-9 _ =]+)?(?:;)?",
RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.Multiline);
private object _lockAddObject = new object();
private object _lockRetrieveObject = new object();
private object _lockReadOnlyObject = new object();
private ConcurrentDictionary<string, RedisNativeClient> _clients = new ConcurrentDictionary<string, RedisNativeClient>();
private static ConcurrentDictionary<string, object> _providers = new ConcurrentDictionary<string, object>();
public static Provider<TObject> GetProvider(string endpoint, bool appendTypeNameToEndpoint = false, string endpointAlias = null) {
return _providers.GetOrAdd(endpoint, key =>
new Provider<TObject>
{
Endpoint = endpoint,
IDFunc = x => Guid.NewGuid(),
EndpointAlias = endpointAlias,
AppendTypeNameToEndpoint = appendTypeNameToEndpoint
}) as Provider<TObject>;
}
private void Query(Action<RedisNativeClient> action) {
try {
lock (_lockAddObject) {
var client = GetConnection("Add");
action(client);
}
} catch (Exception ex) {
ESBLogger.Log(ex);
}
}
private TReturn Query<TReturn>(Func<RedisNativeClient, TReturn> func) {
var result = default(TReturn);
try {
lock (_lockRetrieveObject) {
result = func(GetConnection("Get"));
}
} catch (Exception ex) {
ESBLogger.Log(ex);
}
return result;
}
internal class ProviderFactory {
public string Queue { get; set; }
public int Port { get; set; }
public string HostName { get; set; }
}
private bool _containQueueName = false;
private ProviderFactory GetConnectionFactory() {
if (!_containQueueName) {
Endpoint = (AppendTypeNameToEndpoint ? String.Format("{0};queue={1}", Endpoint,
typeof(TObject).FullName.Replace(".", "_")) : Endpoint) + (EndpointAlias ?? string.Empty);
_containQueueName = true;
}
var regMatch = _regexp.Match(Endpoint);
var hostName = regMatch.Groups["HostName"].Value;
var port = Convert.ToInt32(regMatch.Groups["Port"].Value);
var queue = regMatch.Groups["Queue"].Value;
return new ProviderFactory() { Queue = queue, HostName = hostName, Port = port };
}
private ProviderFactory _provider = null;
private ProviderFactory RedisInfo {
get {
if (_provider == null)
_provider = GetConnectionFactory();
return _provider;
}
}
private RedisNativeClient GetRedisConnection() {
var info = RedisInfo;
return new RedisNativeClient(info.HostName, info.Port);
}
private RedisNativeClient GetConnection(string name) {
var client = default(RedisNativeClient);
var info = RedisInfo;
if (!_clients.TryGetValue(name, out client)) {
client = _clients[name] = GetRedisConnection();
}
if (client.HadExceptions){
MethodHelper.Try(() => client.Dispose());
client = _clients[name] = GetRedisConnection();
}
return client;
}
private DateTime _lastSave = DateTime.Now;
private void SaveAsync() {
var client = GetConnection("Persist");
MethodHelper.Try(() =>
{
var lastSave = client.LastSave;
if (lastSave > _lastSave) {
Task.Factory.StartNew(o =>
{
var redis = o as RedisNativeClient;
MethodHelper.Try(() => redis.BgSave());
}, client);
_lastSave = lastSave;
}
});
}
public void SaveSync() {
var client = GetConnection("Persist");
client.Save();
}
public void Clear() {
Query(client =>
{
Delete(this.ToList());
});
}
public void Delete() {
Clear();
}
private byte[] GetID(TObject obj) {
return IDFunc(obj).ToByteArray();
}
public bool Refresh(TObject obj) {
return Add(obj);
}
public bool Exists(TObject obj) {
return Query(client => client.HExists(RedisInfo.Queue, GetID(obj)) == 1);
}
public ulong Incr(string key) {
return Query(client => (ulong)client.Incr(key));
}
public ulong Decr(string key) {
return Query(client => (ulong)client.Decr(key));
}
public TValue Get<TValue>(string key) {
return Query(client => {
var buffer = client.Get(key);
return (TValue)Convert.ChangeType(buffer == null ? "0" : buffer.UTF8ByteToString(),
typeof(TValue));
});
}
public void Set<TValue>(string key, TValue value) {
Query(client => client.Set(key, value.ToString().ToUTF8Bytes()));
}
public TObject Get(Guid id) {
return Query(client =>
{
var data = client.HGet(RedisInfo.Queue, id.ToByteArray());
return data != null ? data.Deserialize<TObject>() : null;
});
}
public bool Add(TObject obj) {
var success = false;
Query(client =>
{
client.HSet(RedisInfo.Queue, GetID(obj), obj.Serialize());
success = true;
});
return success;
}
public bool Add(IEnumerable<TObject> list) {
var success = false;
Query(client =>
{
var count = list.Count();
if (count > 0) {
var index = 0;
var keys = new byte[count][];
var values = new byte[count][];
foreach (var obj in list) {
keys[index] = GetID(obj);
values[index] = obj.Serialize();
index++;
}
client.HMSet(RedisInfo.Queue, keys, values);
}
success = true;
});
return success;
}
public bool Delete(IEnumerable<TObject> list) {
return Query<bool>(client =>
{
foreach (var obj in list)
client.HDel(RedisInfo.Queue, GetID(obj));
return true;
});
}
public bool Exists(Func<TObject, bool> condition) {
return Get(condition) != null;
}
public TObject Get(Func<TObject, bool> condition) {
return this.FirstOrDefault(d => d != null && condition(d));
}
public bool Delete(Func<TObject, bool> condition) {
var objs = this.Where(d => d != null && condition(d));
return Delete(objs);
}
public bool Delete(TObject obj) {
return Query(client => {
var success = client.HDel(RedisInfo.Queue, GetID(obj)) == 1;
return success;
});
}
#region IObjectProvider<TObject> Members
public bool Delete(IEnumerable<Func<TObject, bool>> conditions) {
var objs = this.Where(d => d != null && conditions.Any(c => c(d))).ToList();
return Delete(objs);
}
public string Endpoint { get; set; }
public string EndpointAlias { get; set; }
public bool AppendTypeNameToEndpoint { get; set; }
public bool DeleteObjectOnRead { get; set; }
public Func<TObject, Guid> IDFunc { get; set; }
#endregion
#region IEnumerable<TObject> Members
public IEnumerator<TObject> GetEnumerator() {
lock (_lockReadOnlyObject) {
var client = GetConnection("ReadOnly");
var queue = RedisInfo.Queue;
var values = client.HGetAll(queue);
for (var i = 0; i < values.Length; i += 2) {
var key = values[i];
var value = values[i + 1];
if (DeleteObjectOnRead) {
client.HDel(queue, key);
}
yield return value.Deserialize<TObject>();
}
}
}
#endregion
#region IEnumerable Members
IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
#endregion
#region IDisposable Members
public void Dispose() {
foreach (var client in _clients)
MethodHelper.Try(client.Value.Dispose);
_clients.Clear();
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment