Last active
September 25, 2019 10:05
-
-
Save wi7a1ian/7f067c3ddf9d69ec3ba73080fac6f32e to your computer and use it in GitHub Desktop.
Implementation of Event Aggregator Pattern #csharp c#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IMessageHub : IDisposable | |
{ | |
void RegisterGlobalHandler(Action<Type, object> onMessage); | |
void RegisterGlobalErrorHandler(Action<Guid, Exception> onError); | |
void Publish<T>(T message); | |
Guid Subscribe<T>(Action<T> action); | |
void Unsubscribe(Guid token); | |
bool IsSubscribed(Guid token); | |
void ClearSubscriptions(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public sealed class MessageHub : IMessageHub | |
{ | |
private readonly Subscriptions _subscriptions = new Subscriptions(); | |
private Action<Type, object> _globalHandler; | |
private Action<Guid, Exception> _globalErrorHandler; | |
public void RegisterGlobalHandler(Action<Type, object> onMessage) | |
{ | |
EnsureNotNull(onMessage); | |
_globalHandler = onMessage; | |
} | |
public void RegisterGlobalErrorHandler(Action<Guid, Exception> onError) | |
{ | |
EnsureNotNull(onError); | |
_globalErrorHandler = onError; | |
} | |
public void Publish<T>(T message) | |
{ | |
var localSubscriptions = _subscriptions.GetTheLatestSubscriptions(); | |
var msgType = typeof(T); | |
_globalHandler?.Invoke(msgType, message); | |
for (var idx = 0; idx < localSubscriptions.Count; idx++) | |
{ | |
var subscription = localSubscriptions[idx]; | |
if (!subscription.Type.IsAssignableFrom(msgType)) { continue; } | |
try | |
{ | |
subscription.Handle(message); | |
} | |
catch (Exception e) | |
{ | |
_globalErrorHandler?.Invoke(subscription.Token, e); | |
} | |
} | |
} | |
public Guid Subscribe<T>(Action<T> action) | |
{ | |
EnsureNotNull(action); | |
return _subscriptions.Register(action); | |
} | |
public void Unsubscribe(Guid token) => _subscriptions.UnRegister(token); | |
public bool IsSubscribed(Guid token) => _subscriptions.IsRegistered(token); | |
public void ClearSubscriptions() => _subscriptions.Clear(false); | |
public void Dispose() | |
{ | |
_globalHandler = null; | |
_subscriptions.Clear(true); | |
} | |
[DebuggerStepThrough] | |
private void EnsureNotNull(object obj) | |
{ | |
if (obj == null) { throw new NullReferenceException(nameof(obj)); } | |
} | |
private sealed class Subscription | |
{ | |
public Subscription(Type type, Guid token, object handler) | |
{ | |
Type = type; | |
Token = token; | |
Handler = handler; | |
} | |
public void Handle<T>(T message) | |
{ | |
((Action<T>)Handler)(message); | |
} | |
public Guid Token { get; } | |
public Type Type { get; } | |
public object Handler { get; } | |
} | |
private class Subscriptions | |
{ | |
private readonly List<Subscription> AllSubscriptions = new List<Subscription>(); | |
private int _subscriptionsChangeCounter; | |
private readonly ThreadLocal<int> _localSubscriptionRevision = | |
new ThreadLocal<int>(() => 0, true); | |
private readonly ThreadLocal<List<Subscription>> _localSubscriptions = | |
new ThreadLocal<List<Subscription>>(() => new List<Subscription>(), true); | |
private bool _disposed; | |
public Guid Register<T>(Action<T> action) | |
{ | |
var type = typeof(T); | |
var key = Guid.NewGuid(); | |
var subscription = new Subscription(type, key, action); | |
lock (AllSubscriptions) | |
{ | |
AllSubscriptions.Add(subscription); | |
_subscriptionsChangeCounter++; | |
} | |
return key; | |
} | |
public void UnRegister(Guid token) | |
{ | |
lock (AllSubscriptions) | |
{ | |
var idx = AllSubscriptions.FindIndex(s => s.Token == token); | |
if (idx < 0) { return; } | |
var subscription = AllSubscriptions[idx]; | |
AllSubscriptions.RemoveAt(idx); | |
for (var i = 0; i < _localSubscriptions.Values.Count; i++) | |
{ | |
var threadLocal = _localSubscriptions.Values[i]; | |
var localIdx = threadLocal.IndexOf(subscription); | |
if (localIdx < 0) { continue; } | |
threadLocal.RemoveAt(localIdx); | |
} | |
_subscriptionsChangeCounter++; | |
} | |
} | |
public void Clear(bool dispose) | |
{ | |
lock (AllSubscriptions) | |
{ | |
if (_disposed) { return; } | |
AllSubscriptions.Clear(); | |
for (var i = 0; i < _localSubscriptions.Values.Count; i++) | |
{ | |
_localSubscriptions.Values[i].Clear(); | |
} | |
if (dispose) | |
{ | |
_localSubscriptionRevision.Dispose(); | |
_localSubscriptions.Dispose(); | |
_disposed = true; | |
} | |
else | |
{ | |
_subscriptionsChangeCounter++; | |
} | |
} | |
} | |
public bool IsRegistered(Guid token) | |
{ | |
lock (AllSubscriptions) { return AllSubscriptions.Any(s => s.Token == token); } | |
} | |
public List<Subscription> GetTheLatestSubscriptions() | |
{ | |
var changeCounterLatestCopy = Interlocked.CompareExchange( | |
ref _subscriptionsChangeCounter, 0, 0); | |
if (_localSubscriptionRevision.Value == changeCounterLatestCopy) | |
{ | |
return _localSubscriptions.Value; | |
} | |
List<Subscription> latestSubscriptions; | |
lock (AllSubscriptions) | |
{ | |
latestSubscriptions = AllSubscriptions.ToList(); | |
} | |
_localSubscriptionRevision.Value = changeCounterLatestCopy; | |
_localSubscriptions.Value = latestSubscriptions; | |
return _localSubscriptions.Value; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var hub = new MessageHub(); | |
subscriptionToken = hub.Subscribe<SomeMessage>( m => DoSthWith(m)); | |
hub.Unsubscribe(subscriptionToken); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment