Last active
December 11, 2015 06:18
-
-
Save luisrudge/4558260 to your computer and use it in GitHub Desktop.
signalr pubsub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var s = new Subscriber(); | |
const string topic = "Topic.Name"; | |
s.Subscribe<string>(topic, msg => Console.WriteLine("NOTIFIED: " + msg)).Wait(); | |
var p = new Publisher(); | |
var control = ""; | |
Console.WriteLine("Press q to quit"); | |
while (control != "q") | |
{ | |
control = Console.ReadLine(); | |
p.Publish(topic, control); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Publisher : IPublisher, IDisposable | |
{ | |
private readonly HubConnection _connection; | |
private IHubProxy _hub; | |
private bool _disposed; | |
/// <summary> | |
/// Inicializa a classe Publisher, inicializando a conexão e o hub do signalr | |
/// </summary> | |
public Publisher() | |
{ | |
_connection = new HubConnection(ConfigurationHelper.GetConfiguration("TecUnica.PubSub.HubUrl", true)); | |
_hub = _connection.CreateHubProxy("PubSubHub"); | |
} | |
/// <summary> | |
/// Publica um objeto no tópico | |
/// </summary> | |
/// <param name="topic">tópic</param> | |
/// <param name="data">objeto</param> | |
public async void Publish(string topic, object data) | |
{ | |
await _connection.Start(); | |
await _hub.Invoke(Constants.NotifyMethodName, topic, data); | |
} | |
/// <summary> | |
/// Libera recursos | |
/// </summary> | |
public void Dispose() | |
{ | |
Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
/// <summary> | |
/// Libera recursos | |
/// </summary> | |
/// <param name="disposing"></param> | |
protected virtual void Dispose(bool disposing) | |
{ | |
if (!_disposed) | |
{ | |
if (disposing) | |
{ | |
_connection.Disconnect(); | |
_hub = null; | |
} | |
} | |
_disposed = true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class PubSubHub : Hub | |
{ | |
private readonly static TopicConnectionMapping Connections = new TopicConnectionMapping(); | |
private static readonly ILog Log = LogManager.GetLogger(typeof(PubSubHub)); | |
public void Subscribe(string topic) | |
{ | |
Log.InfoFormat("Conexão '{0}' assinou o tópico '{1}'", Context.ConnectionId, topic); | |
Connections.Add(topic, Context.ConnectionId); | |
} | |
public void Unsubscribe(string topic) | |
{ | |
Log.InfoFormat("Conexão '{0}' deixou de assinar o tópico '{1}'", Context.ConnectionId, topic); | |
Connections.Remove(topic, Context.ConnectionId); | |
} | |
public void Notify(string topic, object data) | |
{ | |
Log.InfoFormat("Notificando assinates do grupo '{0}'", topic); | |
foreach (var connection in Connections.GetConnections(topic)) | |
{ | |
Log.DebugFormat("Conexão '{0}' foi notificada sobre o tópico'{1}'", Context.ConnectionId, topic); | |
Clients.Client(connection).Notify(data); | |
} | |
} | |
public override Task OnConnected() | |
{ | |
Log.InfoFormat("Cliente conectado. ConnectionId: '{0}'", Context.ConnectionId); | |
return base.OnConnected(); | |
} | |
public override Task OnDisconnected() | |
{ | |
Log.InfoFormat("Cliente desconectado. ConnectionId: '{0}'", Context.ConnectionId); | |
return base.OnDisconnected(); | |
} | |
public override Task OnReconnected() | |
{ | |
Log.InfoFormat("Cliente reconectado. ConnectionId: '{0}'", Context.ConnectionId); | |
return base.OnReconnected(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Subscriber : ISubscriber, IDisposable | |
{ | |
private readonly HubConnection _connection; | |
private IHubProxy _hub; | |
private bool _disposed; | |
private static Dictionary<string, IDisposable> _disposables = new Dictionary<string, IDisposable>(); | |
/// <summary> | |
/// Inicializa a classe Subscriber, inicializando a conexão e o hub do signalr | |
/// </summary> | |
public Subscriber() | |
{ | |
_connection = new HubConnection(ConfigurationHelper.GetConfiguration("TecUnica.PubSub.HubUrl", true)); | |
_hub = _connection.CreateHubProxy("PubSubHub"); | |
} | |
/// <summary> | |
/// "Assina" um tópico e passa a ação a ser executada quando o tópico for chamado | |
/// </summary> | |
/// <typeparam name="T">Tipo de objeto que a ação vai receber</typeparam> | |
/// <param name="topic">tópico</param> | |
/// <param name="handler">ação a ser executada</param> | |
public async Task Subscribe<T>(string topic, Action<T> handler) | |
{ | |
var disposable = _hub.On(Constants.NotifyMethodName, handler); | |
_disposables.Add(topic, disposable); | |
await _connection.Start(); //quits debugger | |
await _hub.Invoke(Constants.SubscribeMethodName, topic); | |
} | |
/// <summary> | |
/// Quando a conexão como servidor for parada ou interrompida | |
/// </summary> | |
/// <param name="handler">método a ser executado</param> | |
public void OnDisconnect(Action handler) | |
{ | |
_connection.StateChanged += stateChange => { if (stateChange.NewState.Equals(ConnectionState.Disconnected)) handler.Invoke(); }; | |
} | |
/// <summary> | |
/// Para de "assinar" um tópico | |
/// </summary> | |
/// <param name="topic"></param> | |
public async Task Unsubscribe(string topic) | |
{ | |
if (!_connection.State.Equals(ConnectionState.Connected)) return; | |
IDisposable disposable; | |
if (_disposables.TryGetValue(topic, out disposable)) disposable.Dispose(); | |
await _hub.Invoke(Constants.UnsubscribeMethodName, topic); | |
} | |
/// <summary> | |
/// Libera recursos | |
/// </summary> | |
public void Dispose() | |
{ | |
Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
/// <summary> | |
/// Libera recursos | |
/// </summary> | |
/// <param name="disposing"></param> | |
protected async virtual void Dispose(bool disposing) | |
{ | |
if (!_disposed) | |
{ | |
if (disposing) | |
{ | |
foreach (var disposable in _disposables) | |
{ | |
await Unsubscribe(disposable.Key); | |
} | |
_connection.Disconnect(); | |
_hub = null; | |
_disposables = null; | |
} | |
} | |
_disposed = true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Classe para mapear um tópico com várias conexões | |
/// </summary> | |
public class TopicConnectionMapping | |
{ | |
private readonly Dictionary<string, HashSet<string>> _connections = new Dictionary<string, HashSet<string>>(); | |
/// <summary> | |
/// Quantidade de tópicos mapeados | |
/// </summary> | |
public int Count | |
{ | |
get | |
{ | |
return _connections.Count; | |
} | |
} | |
/// <summary> | |
/// Adiciona uma conexão a um tópico | |
/// </summary> | |
/// <param name="topic">tópico</param> | |
/// <param name="connectionId">conexão</param> | |
public void Add(string topic, string connectionId) | |
{ | |
lock (_connections) | |
{ | |
HashSet<string> connections; | |
if (!_connections.TryGetValue(topic, out connections)) | |
{ | |
connections = new HashSet<string>(); | |
_connections.Add(topic, connections); | |
} | |
lock (connections) | |
{ | |
connections.Add(connectionId); | |
} | |
} | |
} | |
/// <summary> | |
/// Recupera as conexões de um tópico | |
/// </summary> | |
/// <param name="topic">tópico</param> | |
/// <returns>lista de conexões</returns> | |
public IEnumerable<string> GetConnections(string topic) | |
{ | |
HashSet<string> connections; | |
if (_connections.TryGetValue(topic, out connections)) | |
{ | |
return connections; | |
} | |
return Enumerable.Empty<string>(); | |
} | |
/// <summary> | |
/// Remove uma conexão de um tópico | |
/// </summary> | |
/// <param name="topic">tópico</param> | |
/// <param name="connectionId">conexão</param> | |
public void Remove(string topic, string connectionId) | |
{ | |
lock (_connections) | |
{ | |
HashSet<string> connections; | |
if (!_connections.TryGetValue(topic, out connections)) | |
{ | |
return; | |
} | |
lock (connections) | |
{ | |
connections.Remove(connectionId); | |
if (connections.Count == 0) | |
{ | |
_connections.Remove(topic); | |
} | |
} | |
} | |
} | |
} |
Also, how did you implement TopicConnectionMapping ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Do you plan to update this with the final async code?