Skip to content

Instantly share code, notes, and snippets.

@dealproc
Created October 27, 2022 20:50
Show Gist options
  • Save dealproc/ee627bb51657de102df6c521e6b2268f to your computer and use it in GitHub Desktop.
Save dealproc/ee627bb51657de102df6c521e6b2268f to your computer and use it in GitHub Desktop.
ServiceBase for ReactiveDomain
public class ServiceBase : IHandle<IMessage>,
IPublisher, IDisposable {
private readonly Func<IListener> _getListener;
private readonly List<IListener> _listeners = new();
private readonly InMemoryBus _bus;
private readonly QueuedHandler _queue;
private readonly List<IDisposable> _disposables = new();
public ServiceBase(string name, ISubscriber inBus, IPublisher outBus, IConfiguredConnection connection) : this(name, inBus, outBus, () => connection.GetQueuedListener(name)) {
}
public ServiceBase(string name, ISubscriber inBus, IPublisher outBus, Func<IListener> getListener) {
Ensure.NotNullOrEmpty(name, nameof(name));
Ensure.NotNull(inBus, nameof(inBus));
Ensure.NotNull(getListener, nameof(getListener));
_getListener = getListener;
_bus = new InMemoryBus($"{this.GetType().Name}:{name} bus", watchSlowMsg: false);
_queue = new QueuedHandler(_bus, $"{this.GetType().Name}:{name} queue");
_queue.Start();
var interfaceTypes = GetType().GetInterfaces().ToList();
foreach (var interfaceType in interfaceTypes) {
if (!interfaceType.IsInterface || !interfaceType.IsGenericType) continue;
var genericTypeDefinition = interfaceType.GetGenericTypeDefinition();
var dataType = interfaceType.GetGenericArguments()[0];
if (dataType == typeof(IMessage)) continue;
if (genericTypeDefinition == typeof(IHandleCommand<>)) {
// subscribes to the "in bus".
var concreteWideningCommandHandler = typeof(WideningHandler<,>).MakeGenericType(dataType, typeof(IMessage));
var instantiatedWideningCommandHandler = Activator.CreateInstance(concreteWideningCommandHandler, this);
var wideningHandlerMethod = inBus.GetType().GetMethods()
.Single(m => m.Name.Equals("Subscribe") &&
m.GetGenericArguments().Length == 1 &&
m.GetParameters().Length == 2);
var wideningToCall = wideningHandlerMethod.MakeGenericMethod(dataType);
_disposables.Add((IDisposable)wideningToCall.Invoke(inBus, new[] { instantiatedWideningCommandHandler, true }));
// subscribes to the internal EventStream.
var concreteCommandHandler = typeof(CommandHandler<>).MakeGenericType(dataType);
var instantiatedCommandHandler = Activator.CreateInstance(concreteCommandHandler, outBus, this);
var eventStreamMethod = EventStream.GetType().GetMethods()
.Single(m => m.Name.Equals("Subscribe") &&
m.GetGenericArguments().Length == 1 &&
m.GetParameters().Length == 2);
var streamToCall = eventStreamMethod.MakeGenericMethod(dataType);
streamToCall.Invoke(EventStream, new[] { instantiatedCommandHandler, true });
} else if (genericTypeDefinition == typeof(IHandle<>)) {
// subscribes to the internal EventStream.
var eventStreamMethod = EventStream.GetType().GetMethods()
.Single(m => m.Name.Equals("Subscribe") &&
m.GetGenericArguments().Length == 1 &&
m.GetParameters().Length == 2);
var streamToCall = eventStreamMethod.MakeGenericMethod(dataType);
streamToCall.Invoke(EventStream, new object[] { this, true });
}
}
}
private IListener AddNewListener() {
var l = _getListener();
lock (_listeners) {
_listeners.Add(l);
}
l.EventStream.SubscribeToAll(_queue);
return l;
}
protected List<Tuple<string, long>> GetCheckpoint() {
lock (_listeners) {
return _listeners.Select(l => new Tuple<string, long>(l.StreamName, l.Position)).ToList();
}
}
protected ISubscriber EventStream => _bus;
protected void Start(string stream, long? checkpoint, bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken))
=> AddNewListener().Start(stream, checkpoint, blockUntilLive, cancelWaitToken);
protected void Start<TAggregate>(Guid id, long? checkpoint, bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource
=> AddNewListener().Start<TAggregate>(id, checkpoint, blockUntilLive, cancelWaitToken);
protected void Start<TAggregate>(long? checkpoint = default(long), bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource
=> AddNewListener().Start<TAggregate>(checkpoint, blockUntilLive, cancelWaitToken);
public bool Idle => _queue.Idle;
public void Publish(IMessage msg) => ((IPublisher)_queue).Publish(msg);
public void Handle(IMessage msg) => ((IHandle<IMessage>)_queue).Handle(msg);
protected bool Disposed { get; private set; }
public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing) {
if (Disposed || !disposing) return;
lock (_listeners) {
_listeners?.ForEach(l => l?.Dispose());
_listeners?.Clear();
}
lock (_disposables) {
_disposables?.ForEach(d => d?.Dispose());
_disposables?.Clear();
}
_queue?.RequestStop();
_bus?.Dispose();
Disposed = true;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment