Created
October 27, 2022 20:50
-
-
Save dealproc/ee627bb51657de102df6c521e6b2268f to your computer and use it in GitHub Desktop.
ServiceBase for ReactiveDomain
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ServiceBase : IHandle<IMessage>, | |
IPublisher, IDisposable { | |
private readonly Func<IListener> _getListener; | |
private readonly List<IListener> _listeners = new(); | |
private readonly InMemoryBus _bus; | |
private readonly QueuedHandler _queue; | |
private readonly List<IDisposable> _disposables = new(); | |
public ServiceBase(string name, ISubscriber inBus, IPublisher outBus, IConfiguredConnection connection) : this(name, inBus, outBus, () => connection.GetQueuedListener(name)) { | |
} | |
public ServiceBase(string name, ISubscriber inBus, IPublisher outBus, Func<IListener> getListener) { | |
Ensure.NotNullOrEmpty(name, nameof(name)); | |
Ensure.NotNull(inBus, nameof(inBus)); | |
Ensure.NotNull(getListener, nameof(getListener)); | |
_getListener = getListener; | |
_bus = new InMemoryBus($"{this.GetType().Name}:{name} bus", watchSlowMsg: false); | |
_queue = new QueuedHandler(_bus, $"{this.GetType().Name}:{name} queue"); | |
_queue.Start(); | |
var interfaceTypes = GetType().GetInterfaces().ToList(); | |
foreach (var interfaceType in interfaceTypes) { | |
if (!interfaceType.IsInterface || !interfaceType.IsGenericType) continue; | |
var genericTypeDefinition = interfaceType.GetGenericTypeDefinition(); | |
var dataType = interfaceType.GetGenericArguments()[0]; | |
if (dataType == typeof(IMessage)) continue; | |
if (genericTypeDefinition == typeof(IHandleCommand<>)) { | |
// subscribes to the "in bus". | |
var concreteWideningCommandHandler = typeof(WideningHandler<,>).MakeGenericType(dataType, typeof(IMessage)); | |
var instantiatedWideningCommandHandler = Activator.CreateInstance(concreteWideningCommandHandler, this); | |
var wideningHandlerMethod = inBus.GetType().GetMethods() | |
.Single(m => m.Name.Equals("Subscribe") && | |
m.GetGenericArguments().Length == 1 && | |
m.GetParameters().Length == 2); | |
var wideningToCall = wideningHandlerMethod.MakeGenericMethod(dataType); | |
_disposables.Add((IDisposable)wideningToCall.Invoke(inBus, new[] { instantiatedWideningCommandHandler, true })); | |
// subscribes to the internal EventStream. | |
var concreteCommandHandler = typeof(CommandHandler<>).MakeGenericType(dataType); | |
var instantiatedCommandHandler = Activator.CreateInstance(concreteCommandHandler, outBus, this); | |
var eventStreamMethod = EventStream.GetType().GetMethods() | |
.Single(m => m.Name.Equals("Subscribe") && | |
m.GetGenericArguments().Length == 1 && | |
m.GetParameters().Length == 2); | |
var streamToCall = eventStreamMethod.MakeGenericMethod(dataType); | |
streamToCall.Invoke(EventStream, new[] { instantiatedCommandHandler, true }); | |
} else if (genericTypeDefinition == typeof(IHandle<>)) { | |
// subscribes to the internal EventStream. | |
var eventStreamMethod = EventStream.GetType().GetMethods() | |
.Single(m => m.Name.Equals("Subscribe") && | |
m.GetGenericArguments().Length == 1 && | |
m.GetParameters().Length == 2); | |
var streamToCall = eventStreamMethod.MakeGenericMethod(dataType); | |
streamToCall.Invoke(EventStream, new object[] { this, true }); | |
} | |
} | |
} | |
private IListener AddNewListener() { | |
var l = _getListener(); | |
lock (_listeners) { | |
_listeners.Add(l); | |
} | |
l.EventStream.SubscribeToAll(_queue); | |
return l; | |
} | |
protected List<Tuple<string, long>> GetCheckpoint() { | |
lock (_listeners) { | |
return _listeners.Select(l => new Tuple<string, long>(l.StreamName, l.Position)).ToList(); | |
} | |
} | |
protected ISubscriber EventStream => _bus; | |
protected void Start(string stream, long? checkpoint, bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) | |
=> AddNewListener().Start(stream, checkpoint, blockUntilLive, cancelWaitToken); | |
protected void Start<TAggregate>(Guid id, long? checkpoint, bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource | |
=> AddNewListener().Start<TAggregate>(id, checkpoint, blockUntilLive, cancelWaitToken); | |
protected void Start<TAggregate>(long? checkpoint = default(long), bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource | |
=> AddNewListener().Start<TAggregate>(checkpoint, blockUntilLive, cancelWaitToken); | |
public bool Idle => _queue.Idle; | |
public void Publish(IMessage msg) => ((IPublisher)_queue).Publish(msg); | |
public void Handle(IMessage msg) => ((IHandle<IMessage>)_queue).Handle(msg); | |
protected bool Disposed { get; private set; } | |
public void Dispose() { | |
Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
protected virtual void Dispose(bool disposing) { | |
if (Disposed || !disposing) return; | |
lock (_listeners) { | |
_listeners?.ForEach(l => l?.Dispose()); | |
_listeners?.Clear(); | |
} | |
lock (_disposables) { | |
_disposables?.ForEach(d => d?.Dispose()); | |
_disposables?.Clear(); | |
} | |
_queue?.RequestStop(); | |
_bus?.Dispose(); | |
Disposed = true; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment