Skip to content

Instantly share code, notes, and snippets.

@alexandrebl
Created March 25, 2017 16:05
Show Gist options
  • Select an option

  • Save alexandrebl/5a9d380e65a8448611bf010ee16d998b to your computer and use it in GitHub Desktop.

Select an option

Save alexandrebl/5a9d380e65a8448611bf010ee16d998b to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
namespace ConsoleApp1 {
class Program {
static void Main(string[] args) {
var channel = "DiagnosticChannel";
var cacheManagerUtility = new CacheManagerUtility<string>();
cacheManagerUtility.Subscribe(channel);
cacheManagerUtility.OnReceiveMessage += delegate (string message) {
if (message != null) Console.WriteLine($"Received message: {message}");
};
cacheManagerUtility.OnSendMessage += delegate (string message) {
if (message != null) Console.WriteLine($"Sent message: {message}");
};
cacheManagerUtility.OnErrorMessage += delegate (Exception exception) {
if (exception != null) Console.WriteLine($"Error: {exception}");
};
cacheManagerUtility.OnInfoMessage += delegate (string message) {
if (message != null) Console.WriteLine($"Sent message: {message}");
};
Task.Factory.StartNew(() => {
while (true) {
try {
cacheManagerUtility.Publish(channel, $"{DateTime.UtcNow:o} - publish test message");
Task.Run(() => Thread.Sleep(1000)).Wait();
} catch (Exception ex) {
Console.WriteLine(ex.ToString());
}
}
});
Console.WriteLine("App initialized");
Console.ReadKey();
}
}
#region CacheUtility
public class CacheManagerUtility<T> {
private readonly ConnectionMultiplexer _connection;
/// <summary>
/// Evento
/// </summary>
public event Action<T> OnReceiveMessage;
/// <summary>
/// Evento
/// </summary>
public event Action<T> OnSendMessage;
public event Action<Exception> OnErrorMessage;
public event Action<string> OnInfoMessage;
private static readonly object SyncObj = new object();
/// <summary>
/// Método construtor
/// </summary>
public CacheManagerUtility() {
//Define conexão
CacheConnectionManager.SetConnectionString("127.0.0.1:6379");
_connection = CacheConnectionManager.OpenConnection(true);
CacheConnectionManager.OnFailed += delegate (ConnectionFailedEventArgs args) {
OnInfoMessageHandle("Connection error");
};
CacheConnectionManager.OnRestored += delegate (ConnectionFailedEventArgs args) {
OnInfoMessageHandle("Connection restored");
};
CacheConnectionManager.OnError += delegate (Exception exception) {
OnErrorMessageHadle(new Exception("Connection error"));
};
CacheConnectionManager.OnInfo += OnInfoMessageHandle;
}
public bool IsConnected() {
lock (SyncObj) {
return _connection?.IsConnected ?? false;
}
}
/// <summary>
/// Assina a fila
/// </summary>
/// <param name="channel">canal</param>
public void Subscribe(string channel) {
var subscriber = GetSubscriber();
try {
subscriber?.Subscribe(channel, (redisChannel, message) => {
var obj = RedisValueToObj(message);
OnReceiveMessageHadle(obj);
});
subscriber?.Unsubscribe(channel, (redisChannel, value) => {
this.Subscribe(channel);
});
} catch (Exception ex) {
OnErrorMessageHadle(ex);
Task.Run(() => {
Thread.Sleep(1000);
});
this.Subscribe(channel);
}
}
public void Publish(string channel, T obj) {
if (!IsConnected()) {
OnErrorMessageHadle(new Exception("Connection is not ready to publish"));
return;
}
var publisher = GetSubscriber();
var message = ObjToRedisValue(obj);
publisher?.Publish(channel, message);
OnSendMessageHadle(obj);
}
private ISubscriber GetSubscriber() {
if (!IsConnected()) {
OnErrorMessageHadle(new Exception("Connection is not ready to subcribe"));
return null;
}
var subscriber = _connection.GetSubscriber();
return subscriber;
}
private static RedisValue ObjToRedisValue(T obj) {
RedisValue redisValue = (obj is string) ? obj.ToString() : JsonConvert.SerializeObject(obj);
return redisValue;
}
private static T RedisValueToObj(RedisValue redisValue) {
var obj = default(T);
if (!redisValue.HasValue) return obj;
if (IsValidJson(redisValue))
obj = JsonConvert.DeserializeObject<T>(redisValue);
else obj = (T)(object)(string)redisValue;
return obj;
}
private static bool IsValidJson(string json) {
try {
JObject.Parse(json);
return true;
} catch (Exception) {
return false;
}
}
protected virtual void OnReceiveMessageHadle(T obj) {
OnReceiveMessage?.Invoke(obj);
}
protected virtual void OnSendMessageHadle(T obj) {
OnSendMessage?.Invoke(obj);
}
protected virtual void OnErrorMessageHadle(Exception ex) {
OnErrorMessage?.Invoke(ex);
}
protected virtual void OnInfoMessageHandle(string message) {
OnInfoMessage?.Invoke(message);
}
}
#endregion
#region Cache Connection Manager
/// <summary>
/// Gerenciador de conexão de cache
/// </summary>
public static class CacheConnectionManager {
/// <summary>
/// String de conexão
/// </summary>
private static string _connectionString;
private static ConnectionMultiplexer _connection;
private static readonly object SyncObj = new object();
/// <summary>
/// Evento
/// </summary>
public static event Action<ConnectionFailedEventArgs> OnFailed;
/// <summary>
/// Evento
/// </summary>
public static event Action<ConnectionFailedEventArgs> OnRestored;
/// <summary>
/// Evento
/// </summary>
public static event Action<Exception> OnError;
/// <summary>
/// Evento
/// </summary>
public static event Action<string> OnInfo;
/// <summary>
/// Abre uma conexão
/// </summary>
public static ConnectionMultiplexer OpenConnection(bool isPersistence) {
if ((_connection != null) && (_connection.IsConnected)) return _connection;
lock (SyncObj) {
try {
if ((_connection == null) || (_connection.IsConnected)) {
_connection = ConnectionMultiplexer.Connect(_connectionString);
_connection.ConnectionFailed += delegate (object sender, ConnectionFailedEventArgs args) {
OnFailedHandle(args);
if (isPersistence) _connection = RetryOpenConnection();
};
_connection.ConnectionRestored += delegate (object sender, ConnectionFailedEventArgs args) {
OnRestoredHandle(args);
};
}
} catch (Exception ex) {
OnErrorHandle(ex);
if (isPersistence) return RetryOpenConnection();
}
}
return _connection;
}
private static ConnectionMultiplexer RetryOpenConnection(int waitMilliseconds = 1000) {
OnInfoHandle($"Waitting {waitMilliseconds} ms for retry connection");
Task.Run(() => {
Thread.Sleep(waitMilliseconds);
}).Wait();
OnInfoHandle("Retry connection");
return OpenConnection(true);
}
/// <summary>
/// Define a string de conexão
/// </summary>
/// <param name="connectionString">string de conexão</param>
public static void SetConnectionString(string connectionString) {
//Define conexão
_connectionString = connectionString;
}
private static void OnFailedHandle(ConnectionFailedEventArgs obj) {
OnFailed?.Invoke(obj);
}
private static void OnRestoredHandle(ConnectionFailedEventArgs obj) {
OnRestored?.Invoke(obj);
}
private static void OnErrorHandle(Exception ex) {
OnError?.Invoke(ex);
}
private static void OnInfoHandle(string message) {
OnInfo?.Invoke(message);
}
}
#endregion
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment