Skip to content

Instantly share code, notes, and snippets.

@luisrudge
Last active December 11, 2015 06:18
Show Gist options
  • Save luisrudge/4558260 to your computer and use it in GitHub Desktop.
Save luisrudge/4558260 to your computer and use it in GitHub Desktop.
signalr pubsub
class Program
{
static void Main(string[] args)
{
var s = new Subscriber();
const string topic = "Topic.Name";
s.Subscribe<string>(topic, msg => Console.WriteLine("NOTIFIED: " + msg)).Wait();
var p = new Publisher();
var control = "";
Console.WriteLine("Press q to quit");
while (control != "q")
{
control = Console.ReadLine();
p.Publish(topic, control);
}
}
}
public class Publisher : IPublisher, IDisposable
{
private readonly HubConnection _connection;
private IHubProxy _hub;
private bool _disposed;
/// <summary>
/// Inicializa a classe Publisher, inicializando a conexão e o hub do signalr
/// </summary>
public Publisher()
{
_connection = new HubConnection(ConfigurationHelper.GetConfiguration("TecUnica.PubSub.HubUrl", true));
_hub = _connection.CreateHubProxy("PubSubHub");
}
/// <summary>
/// Publica um objeto no tópico
/// </summary>
/// <param name="topic">tópic</param>
/// <param name="data">objeto</param>
public async void Publish(string topic, object data)
{
await _connection.Start();
await _hub.Invoke(Constants.NotifyMethodName, topic, data);
}
/// <summary>
/// Libera recursos
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Libera recursos
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_connection.Disconnect();
_hub = null;
}
}
_disposed = true;
}
}
public class PubSubHub : Hub
{
private readonly static TopicConnectionMapping Connections = new TopicConnectionMapping();
private static readonly ILog Log = LogManager.GetLogger(typeof(PubSubHub));
public void Subscribe(string topic)
{
Log.InfoFormat("Conexão '{0}' assinou o tópico '{1}'", Context.ConnectionId, topic);
Connections.Add(topic, Context.ConnectionId);
}
public void Unsubscribe(string topic)
{
Log.InfoFormat("Conexão '{0}' deixou de assinar o tópico '{1}'", Context.ConnectionId, topic);
Connections.Remove(topic, Context.ConnectionId);
}
public void Notify(string topic, object data)
{
Log.InfoFormat("Notificando assinates do grupo '{0}'", topic);
foreach (var connection in Connections.GetConnections(topic))
{
Log.DebugFormat("Conexão '{0}' foi notificada sobre o tópico'{1}'", Context.ConnectionId, topic);
Clients.Client(connection).Notify(data);
}
}
public override Task OnConnected()
{
Log.InfoFormat("Cliente conectado. ConnectionId: '{0}'", Context.ConnectionId);
return base.OnConnected();
}
public override Task OnDisconnected()
{
Log.InfoFormat("Cliente desconectado. ConnectionId: '{0}'", Context.ConnectionId);
return base.OnDisconnected();
}
public override Task OnReconnected()
{
Log.InfoFormat("Cliente reconectado. ConnectionId: '{0}'", Context.ConnectionId);
return base.OnReconnected();
}
}
public class Subscriber : ISubscriber, IDisposable
{
private readonly HubConnection _connection;
private IHubProxy _hub;
private bool _disposed;
private static Dictionary<string, IDisposable> _disposables = new Dictionary<string, IDisposable>();
/// <summary>
/// Inicializa a classe Subscriber, inicializando a conexão e o hub do signalr
/// </summary>
public Subscriber()
{
_connection = new HubConnection(ConfigurationHelper.GetConfiguration("TecUnica.PubSub.HubUrl", true));
_hub = _connection.CreateHubProxy("PubSubHub");
}
/// <summary>
/// "Assina" um tópico e passa a ação a ser executada quando o tópico for chamado
/// </summary>
/// <typeparam name="T">Tipo de objeto que a ação vai receber</typeparam>
/// <param name="topic">tópico</param>
/// <param name="handler">ação a ser executada</param>
public async Task Subscribe<T>(string topic, Action<T> handler)
{
var disposable = _hub.On(Constants.NotifyMethodName, handler);
_disposables.Add(topic, disposable);
await _connection.Start(); //quits debugger
await _hub.Invoke(Constants.SubscribeMethodName, topic);
}
/// <summary>
/// Quando a conexão como servidor for parada ou interrompida
/// </summary>
/// <param name="handler">método a ser executado</param>
public void OnDisconnect(Action handler)
{
_connection.StateChanged += stateChange => { if (stateChange.NewState.Equals(ConnectionState.Disconnected)) handler.Invoke(); };
}
/// <summary>
/// Para de "assinar" um tópico
/// </summary>
/// <param name="topic"></param>
public async Task Unsubscribe(string topic)
{
if (!_connection.State.Equals(ConnectionState.Connected)) return;
IDisposable disposable;
if (_disposables.TryGetValue(topic, out disposable)) disposable.Dispose();
await _hub.Invoke(Constants.UnsubscribeMethodName, topic);
}
/// <summary>
/// Libera recursos
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Libera recursos
/// </summary>
/// <param name="disposing"></param>
protected async virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
foreach (var disposable in _disposables)
{
await Unsubscribe(disposable.Key);
}
_connection.Disconnect();
_hub = null;
_disposables = null;
}
}
_disposed = true;
}
}
/// <summary>
/// Classe para mapear um tópico com várias conexões
/// </summary>
public class TopicConnectionMapping
{
private readonly Dictionary<string, HashSet<string>> _connections = new Dictionary<string, HashSet<string>>();
/// <summary>
/// Quantidade de tópicos mapeados
/// </summary>
public int Count
{
get
{
return _connections.Count;
}
}
/// <summary>
/// Adiciona uma conexão a um tópico
/// </summary>
/// <param name="topic">tópico</param>
/// <param name="connectionId">conexão</param>
public void Add(string topic, string connectionId)
{
lock (_connections)
{
HashSet<string> connections;
if (!_connections.TryGetValue(topic, out connections))
{
connections = new HashSet<string>();
_connections.Add(topic, connections);
}
lock (connections)
{
connections.Add(connectionId);
}
}
}
/// <summary>
/// Recupera as conexões de um tópico
/// </summary>
/// <param name="topic">tópico</param>
/// <returns>lista de conexões</returns>
public IEnumerable<string> GetConnections(string topic)
{
HashSet<string> connections;
if (_connections.TryGetValue(topic, out connections))
{
return connections;
}
return Enumerable.Empty<string>();
}
/// <summary>
/// Remove uma conexão de um tópico
/// </summary>
/// <param name="topic">tópico</param>
/// <param name="connectionId">conexão</param>
public void Remove(string topic, string connectionId)
{
lock (_connections)
{
HashSet<string> connections;
if (!_connections.TryGetValue(topic, out connections))
{
return;
}
lock (connections)
{
connections.Remove(connectionId);
if (connections.Count == 0)
{
_connections.Remove(topic);
}
}
}
}
}
@ChristianWeyer
Copy link

Do you plan to update this with the final async code?

@ChristianWeyer
Copy link

Also, how did you implement TopicConnectionMapping ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment