Skip to content

Instantly share code, notes, and snippets.

@wi7a1ian
Last active September 25, 2019 10:05
Show Gist options
  • Save wi7a1ian/7f067c3ddf9d69ec3ba73080fac6f32e to your computer and use it in GitHub Desktop.
Save wi7a1ian/7f067c3ddf9d69ec3ba73080fac6f32e to your computer and use it in GitHub Desktop.
Implementation of Event Aggregator Pattern #csharp c#
public interface IMessageHub : IDisposable
{
void RegisterGlobalHandler(Action<Type, object> onMessage);
void RegisterGlobalErrorHandler(Action<Guid, Exception> onError);
void Publish<T>(T message);
Guid Subscribe<T>(Action<T> action);
void Unsubscribe(Guid token);
bool IsSubscribed(Guid token);
void ClearSubscriptions();
}
public sealed class MessageHub : IMessageHub
{
private readonly Subscriptions _subscriptions = new Subscriptions();
private Action<Type, object> _globalHandler;
private Action<Guid, Exception> _globalErrorHandler;
public void RegisterGlobalHandler(Action<Type, object> onMessage)
{
EnsureNotNull(onMessage);
_globalHandler = onMessage;
}
public void RegisterGlobalErrorHandler(Action<Guid, Exception> onError)
{
EnsureNotNull(onError);
_globalErrorHandler = onError;
}
public void Publish<T>(T message)
{
var localSubscriptions = _subscriptions.GetTheLatestSubscriptions();
var msgType = typeof(T);
_globalHandler?.Invoke(msgType, message);
for (var idx = 0; idx < localSubscriptions.Count; idx++)
{
var subscription = localSubscriptions[idx];
if (!subscription.Type.IsAssignableFrom(msgType)) { continue; }
try
{
subscription.Handle(message);
}
catch (Exception e)
{
_globalErrorHandler?.Invoke(subscription.Token, e);
}
}
}
public Guid Subscribe<T>(Action<T> action)
{
EnsureNotNull(action);
return _subscriptions.Register(action);
}
public void Unsubscribe(Guid token) => _subscriptions.UnRegister(token);
public bool IsSubscribed(Guid token) => _subscriptions.IsRegistered(token);
public void ClearSubscriptions() => _subscriptions.Clear(false);
public void Dispose()
{
_globalHandler = null;
_subscriptions.Clear(true);
}
[DebuggerStepThrough]
private void EnsureNotNull(object obj)
{
if (obj == null) { throw new NullReferenceException(nameof(obj)); }
}
private sealed class Subscription
{
public Subscription(Type type, Guid token, object handler)
{
Type = type;
Token = token;
Handler = handler;
}
public void Handle<T>(T message)
{
((Action<T>)Handler)(message);
}
public Guid Token { get; }
public Type Type { get; }
public object Handler { get; }
}
private class Subscriptions
{
private readonly List<Subscription> AllSubscriptions = new List<Subscription>();
private int _subscriptionsChangeCounter;
private readonly ThreadLocal<int> _localSubscriptionRevision =
new ThreadLocal<int>(() => 0, true);
private readonly ThreadLocal<List<Subscription>> _localSubscriptions =
new ThreadLocal<List<Subscription>>(() => new List<Subscription>(), true);
private bool _disposed;
public Guid Register<T>(Action<T> action)
{
var type = typeof(T);
var key = Guid.NewGuid();
var subscription = new Subscription(type, key, action);
lock (AllSubscriptions)
{
AllSubscriptions.Add(subscription);
_subscriptionsChangeCounter++;
}
return key;
}
public void UnRegister(Guid token)
{
lock (AllSubscriptions)
{
var idx = AllSubscriptions.FindIndex(s => s.Token == token);
if (idx < 0) { return; }
var subscription = AllSubscriptions[idx];
AllSubscriptions.RemoveAt(idx);
for (var i = 0; i < _localSubscriptions.Values.Count; i++)
{
var threadLocal = _localSubscriptions.Values[i];
var localIdx = threadLocal.IndexOf(subscription);
if (localIdx < 0) { continue; }
threadLocal.RemoveAt(localIdx);
}
_subscriptionsChangeCounter++;
}
}
public void Clear(bool dispose)
{
lock (AllSubscriptions)
{
if (_disposed) { return; }
AllSubscriptions.Clear();
for (var i = 0; i < _localSubscriptions.Values.Count; i++)
{
_localSubscriptions.Values[i].Clear();
}
if (dispose)
{
_localSubscriptionRevision.Dispose();
_localSubscriptions.Dispose();
_disposed = true;
}
else
{
_subscriptionsChangeCounter++;
}
}
}
public bool IsRegistered(Guid token)
{
lock (AllSubscriptions) { return AllSubscriptions.Any(s => s.Token == token); }
}
public List<Subscription> GetTheLatestSubscriptions()
{
var changeCounterLatestCopy = Interlocked.CompareExchange(
ref _subscriptionsChangeCounter, 0, 0);
if (_localSubscriptionRevision.Value == changeCounterLatestCopy)
{
return _localSubscriptions.Value;
}
List<Subscription> latestSubscriptions;
lock (AllSubscriptions)
{
latestSubscriptions = AllSubscriptions.ToList();
}
_localSubscriptionRevision.Value = changeCounterLatestCopy;
_localSubscriptions.Value = latestSubscriptions;
return _localSubscriptions.Value;
}
}
}
var hub = new MessageHub();
subscriptionToken = hub.Subscribe<SomeMessage>( m => DoSthWith(m));
hub.Unsubscribe(subscriptionToken);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment