Created
March 25, 2017 16:05
-
-
Save alexandrebl/5a9d380e65a8448611bf010ee16d998b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Linq.Expressions; | |
| using System.Text; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| using Newtonsoft.Json; | |
| using Newtonsoft.Json.Linq; | |
| using StackExchange.Redis; | |
| namespace ConsoleApp1 { | |
| class Program { | |
| static void Main(string[] args) { | |
| var channel = "DiagnosticChannel"; | |
| var cacheManagerUtility = new CacheManagerUtility<string>(); | |
| cacheManagerUtility.Subscribe(channel); | |
| cacheManagerUtility.OnReceiveMessage += delegate (string message) { | |
| if (message != null) Console.WriteLine($"Received message: {message}"); | |
| }; | |
| cacheManagerUtility.OnSendMessage += delegate (string message) { | |
| if (message != null) Console.WriteLine($"Sent message: {message}"); | |
| }; | |
| cacheManagerUtility.OnErrorMessage += delegate (Exception exception) { | |
| if (exception != null) Console.WriteLine($"Error: {exception}"); | |
| }; | |
| cacheManagerUtility.OnInfoMessage += delegate (string message) { | |
| if (message != null) Console.WriteLine($"Sent message: {message}"); | |
| }; | |
| Task.Factory.StartNew(() => { | |
| while (true) { | |
| try { | |
| cacheManagerUtility.Publish(channel, $"{DateTime.UtcNow:o} - publish test message"); | |
| Task.Run(() => Thread.Sleep(1000)).Wait(); | |
| } catch (Exception ex) { | |
| Console.WriteLine(ex.ToString()); | |
| } | |
| } | |
| }); | |
| Console.WriteLine("App initialized"); | |
| Console.ReadKey(); | |
| } | |
| } | |
| #region CacheUtility | |
| public class CacheManagerUtility<T> { | |
| private readonly ConnectionMultiplexer _connection; | |
| /// <summary> | |
| /// Evento | |
| /// </summary> | |
| public event Action<T> OnReceiveMessage; | |
| /// <summary> | |
| /// Evento | |
| /// </summary> | |
| public event Action<T> OnSendMessage; | |
| public event Action<Exception> OnErrorMessage; | |
| public event Action<string> OnInfoMessage; | |
| private static readonly object SyncObj = new object(); | |
| /// <summary> | |
| /// Método construtor | |
| /// </summary> | |
| public CacheManagerUtility() { | |
| //Define conexão | |
| CacheConnectionManager.SetConnectionString("127.0.0.1:6379"); | |
| _connection = CacheConnectionManager.OpenConnection(true); | |
| CacheConnectionManager.OnFailed += delegate (ConnectionFailedEventArgs args) { | |
| OnInfoMessageHandle("Connection error"); | |
| }; | |
| CacheConnectionManager.OnRestored += delegate (ConnectionFailedEventArgs args) { | |
| OnInfoMessageHandle("Connection restored"); | |
| }; | |
| CacheConnectionManager.OnError += delegate (Exception exception) { | |
| OnErrorMessageHadle(new Exception("Connection error")); | |
| }; | |
| CacheConnectionManager.OnInfo += OnInfoMessageHandle; | |
| } | |
| public bool IsConnected() { | |
| lock (SyncObj) { | |
| return _connection?.IsConnected ?? false; | |
| } | |
| } | |
| /// <summary> | |
| /// Assina a fila | |
| /// </summary> | |
| /// <param name="channel">canal</param> | |
| public void Subscribe(string channel) { | |
| var subscriber = GetSubscriber(); | |
| try { | |
| subscriber?.Subscribe(channel, (redisChannel, message) => { | |
| var obj = RedisValueToObj(message); | |
| OnReceiveMessageHadle(obj); | |
| }); | |
| subscriber?.Unsubscribe(channel, (redisChannel, value) => { | |
| this.Subscribe(channel); | |
| }); | |
| } catch (Exception ex) { | |
| OnErrorMessageHadle(ex); | |
| Task.Run(() => { | |
| Thread.Sleep(1000); | |
| }); | |
| this.Subscribe(channel); | |
| } | |
| } | |
| public void Publish(string channel, T obj) { | |
| if (!IsConnected()) { | |
| OnErrorMessageHadle(new Exception("Connection is not ready to publish")); | |
| return; | |
| } | |
| var publisher = GetSubscriber(); | |
| var message = ObjToRedisValue(obj); | |
| publisher?.Publish(channel, message); | |
| OnSendMessageHadle(obj); | |
| } | |
| private ISubscriber GetSubscriber() { | |
| if (!IsConnected()) { | |
| OnErrorMessageHadle(new Exception("Connection is not ready to subcribe")); | |
| return null; | |
| } | |
| var subscriber = _connection.GetSubscriber(); | |
| return subscriber; | |
| } | |
| private static RedisValue ObjToRedisValue(T obj) { | |
| RedisValue redisValue = (obj is string) ? obj.ToString() : JsonConvert.SerializeObject(obj); | |
| return redisValue; | |
| } | |
| private static T RedisValueToObj(RedisValue redisValue) { | |
| var obj = default(T); | |
| if (!redisValue.HasValue) return obj; | |
| if (IsValidJson(redisValue)) | |
| obj = JsonConvert.DeserializeObject<T>(redisValue); | |
| else obj = (T)(object)(string)redisValue; | |
| return obj; | |
| } | |
| private static bool IsValidJson(string json) { | |
| try { | |
| JObject.Parse(json); | |
| return true; | |
| } catch (Exception) { | |
| return false; | |
| } | |
| } | |
| protected virtual void OnReceiveMessageHadle(T obj) { | |
| OnReceiveMessage?.Invoke(obj); | |
| } | |
| protected virtual void OnSendMessageHadle(T obj) { | |
| OnSendMessage?.Invoke(obj); | |
| } | |
| protected virtual void OnErrorMessageHadle(Exception ex) { | |
| OnErrorMessage?.Invoke(ex); | |
| } | |
| protected virtual void OnInfoMessageHandle(string message) { | |
| OnInfoMessage?.Invoke(message); | |
| } | |
| } | |
| #endregion | |
| #region Cache Connection Manager | |
| /// <summary> | |
| /// Gerenciador de conexão de cache | |
| /// </summary> | |
| public static class CacheConnectionManager { | |
| /// <summary> | |
| /// String de conexão | |
| /// </summary> | |
| private static string _connectionString; | |
| private static ConnectionMultiplexer _connection; | |
| private static readonly object SyncObj = new object(); | |
| /// <summary> | |
| /// Evento | |
| /// </summary> | |
| public static event Action<ConnectionFailedEventArgs> OnFailed; | |
| /// <summary> | |
| /// Evento | |
| /// </summary> | |
| public static event Action<ConnectionFailedEventArgs> OnRestored; | |
| /// <summary> | |
| /// Evento | |
| /// </summary> | |
| public static event Action<Exception> OnError; | |
| /// <summary> | |
| /// Evento | |
| /// </summary> | |
| public static event Action<string> OnInfo; | |
| /// <summary> | |
| /// Abre uma conexão | |
| /// </summary> | |
| public static ConnectionMultiplexer OpenConnection(bool isPersistence) { | |
| if ((_connection != null) && (_connection.IsConnected)) return _connection; | |
| lock (SyncObj) { | |
| try { | |
| if ((_connection == null) || (_connection.IsConnected)) { | |
| _connection = ConnectionMultiplexer.Connect(_connectionString); | |
| _connection.ConnectionFailed += delegate (object sender, ConnectionFailedEventArgs args) { | |
| OnFailedHandle(args); | |
| if (isPersistence) _connection = RetryOpenConnection(); | |
| }; | |
| _connection.ConnectionRestored += delegate (object sender, ConnectionFailedEventArgs args) { | |
| OnRestoredHandle(args); | |
| }; | |
| } | |
| } catch (Exception ex) { | |
| OnErrorHandle(ex); | |
| if (isPersistence) return RetryOpenConnection(); | |
| } | |
| } | |
| return _connection; | |
| } | |
| private static ConnectionMultiplexer RetryOpenConnection(int waitMilliseconds = 1000) { | |
| OnInfoHandle($"Waitting {waitMilliseconds} ms for retry connection"); | |
| Task.Run(() => { | |
| Thread.Sleep(waitMilliseconds); | |
| }).Wait(); | |
| OnInfoHandle("Retry connection"); | |
| return OpenConnection(true); | |
| } | |
| /// <summary> | |
| /// Define a string de conexão | |
| /// </summary> | |
| /// <param name="connectionString">string de conexão</param> | |
| public static void SetConnectionString(string connectionString) { | |
| //Define conexão | |
| _connectionString = connectionString; | |
| } | |
| private static void OnFailedHandle(ConnectionFailedEventArgs obj) { | |
| OnFailed?.Invoke(obj); | |
| } | |
| private static void OnRestoredHandle(ConnectionFailedEventArgs obj) { | |
| OnRestored?.Invoke(obj); | |
| } | |
| private static void OnErrorHandle(Exception ex) { | |
| OnError?.Invoke(ex); | |
| } | |
| private static void OnInfoHandle(string message) { | |
| OnInfo?.Invoke(message); | |
| } | |
| } | |
| #endregion | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment