Created
March 19, 2013 20:07
-
-
Save yevhen/5199613 to your computer and use it in GitHub Desktop.
The concept of message handling Component and the example of message handler chaining via functional composition
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* somewhere in your Core.CQRS */ | |
// Base class for all ES-based aggregate command handling components; | |
// | |
// NOTE: "Component" is a logical grouping of message handlers by function | |
// They provide good place to encapsulate chaining of cross-cutting concerns | |
// into a pipeline, providing simplified helper methods for registration of message handlers | |
// | |
// Components are similar to Services, thus they only contain handlers of single type (ie Command Handlers only) | |
// Components operate on envelope (infrastructure) level | |
// | |
// While Message Handlers are open for dependency injection, which makes them extremely easy to unit test, | |
// Components, on the opposite, are self-contained and depend only on external configuration, | |
// In this respect they act as Composition Roots, effectively shielding consumers | |
// from intricacies of Message Handlers configuration and composition. | |
// | |
// Components implement cross-cutting/infrastructure functionality | |
// Individual handlers could be easily unit tested as they are nothing more than | |
// just a static functions, having dependencies only on input parameters, | |
// | |
// I know I'm guilty for using inheritance but I don't care :) | |
// it works perfectly here and concept as the whole proven itself in real-world | |
// putting functions below in their own classes doesn't provide much benefits except purity | |
// they are anyway make sense only in the context of a particular component | |
public abstract class AggregateCommandHandlingComponentBase : MessageHandlingComponentBase | |
{ | |
ICommandHandlerRegistry registry; | |
Action<EventEnvelope> publisher; | |
EventStoreConnection store; | |
public AggregateCommandHandlingComponentBase Configure(Action<EventEnvelope> publisher) | |
{ | |
this.publisher = publisher; | |
// you can pass this externally if you wish | |
var address = ConfigurationManager.AppSettings["EventStore.Uri"]; | |
var port = int.Parse(ConfigurationManager.AppSettings["EventStore.Port"]); | |
var ipEndPoint = new IPEndPoint(IPAddress.Parse(address), port); | |
store = new EventStoreConnection(ipEndPoint); | |
return this; | |
} | |
public void Register(ICommandHandlerRegistry commandHandlerRegistry) | |
{ | |
registry = commandHandlerRegistry; | |
Bootstrap(); | |
} | |
protected void RegisterSingleton<TCommand>(Func<AggregateUnitOfWork, Action<TCommand>> handler) where TCommand : Command | |
{ | |
Register<TCommand>(retryOnConcurrency: true, retryOnDuplicates: true, handler: handler); | |
} | |
protected void Register<TCommand>(Func<AggregateUnitOfWork, Action<TCommand>> handler) where TCommand : Command | |
{ | |
Register<TCommand>(retryOnConcurrency: false, retryOnDuplicates: false, handler: handler); | |
} | |
void Register<TCommand>(bool retryOnConcurrency, bool retryOnDuplicates, Func<AggregateUnitOfWork, Action<TCommand>> handler) where TCommand : Command | |
{ | |
registry.RegisterCommandHandler<TCommand>(x => | |
PerformLogging( | |
CollectStatistics(MessageCategory.Command, | |
PublishEvents(Log, publisher, | |
RetryOnConcurrencyConflicts(retryOnConcurrency, Log, | |
RetryOnDuplicates(retryOnDuplicates, Log, | |
HandleIdempotency(Log, | |
WithinUnitOfWork(store, uow => | |
envelope => handler(uow)((TCommand)envelope.Command) // unwrap message | |
))))))) | |
(x)); | |
} | |
public static Action<CommandEnvelope> PublishEvents(Logger log, Action<EventEnvelope> publisher, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next) | |
{ | |
return envelope => | |
{ | |
var events = next(envelope); | |
foreach (var @event in events) | |
{ | |
publisher.Publish(@event); // delegate which publishes to external Bus | |
} | |
}; | |
} | |
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> RetryOnConcurrencyConflicts(bool retry, Logger log, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next) | |
{ | |
return envelope => | |
{ | |
var retries = 0; | |
while (true) // probably better solution would be is to implement exponential back-off :) | |
{ | |
try | |
{ | |
return next(envelope); | |
} | |
catch (AggregateConcurrencyException ex) | |
{ | |
if (retry) | |
{ | |
if (retries == MaxRetries) | |
{ | |
log.Error("Got concurrency conflict on aggregate {0} for command {1} and limit of max retry attempts [{2}] has been reached. Giving up ...", ex.AggregateId, envelope.Id, maxRetries); | |
throw; | |
} | |
retries++; | |
log.Debug("Got concurrency conflict on aggregate {0} and Retry was designated. Retrying command {1} ... Attempt #{2}", ex.AggregateId, envelope.Id, retries); | |
} | |
else | |
{ | |
log.Debug("Got concurrency conflict on aggregate {0}. Command {1}", ex.AggregateId, envelope.Id); | |
throw; | |
} | |
} | |
} | |
}; | |
} | |
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> RetryOnDuplicates(bool retry, Logger log, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next) | |
{ | |
return envelope => | |
{ | |
try | |
{ | |
return next(envelope); | |
} | |
catch (DuplicateAggregateException ex) | |
{ | |
if (retry) | |
{ | |
log.Debug("Aggregate {0} already exists and Retry was designated. Retrying command {1} ...", ex.AggregateId, envelope.Id); | |
return next(envelope); | |
} | |
log.Debug("Aggregate {0} already exists. Command {1}", ex.AggregateId, envelope.Id); | |
throw; | |
} | |
}; | |
} | |
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> HandleIdempotency(Logger log, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next) | |
{ | |
return envelope => | |
{ | |
try | |
{ | |
// try consume message, ie try storing message id | |
// if it's not unique it will blow | |
return next(envelope); | |
} | |
catch (MessageAlreadyConsumedException) | |
{ | |
log.Warn("Ignored duplicate command {0}", envelope.Id); | |
return new EventEnvelope[0]; | |
} | |
}; | |
} | |
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> WithinUnitOfWork(EventStoreConnection store, Func<AggregateUnitOfWork, Action<CommandEnvelope>> handler) | |
{ | |
return envelope => | |
{ | |
var session = new EventStoreSession(store, envelope.UserId); | |
var repository = new AggregateRepository(session); | |
var uow = new AggregateUnitOfWork(repository); | |
handler(uow)(envelope); | |
return uow.Commit().ToArray(); | |
}; | |
} | |
// this is where subclasses whill register domain specific message handlers | |
protected abstract void Bootstrap(); | |
} | |
/* somewhere in your OrderManagement.Service */ | |
public sealed class AggregateCommandHandlingComponent : AggregateCommandHandlingComponentBase | |
{ | |
protected override void Bootstrap() | |
{ | |
Register<CreateOrder>(uow => new CreateOrderCommandHandler(uow).Handle); | |
Register<UpdateOrder>(uow => new UpdateOrderCommandHandler(uow).Handle); | |
Register<DeleteOrder>(uow => new DeleteOrderCommandHandler(uow).Handle); | |
Register<ApproveOrder>(uow => new ApproveOrderCommandHandler(uow).Handle); | |
Register<ShipOrder>(uow => new ShipOrderCommandHandler(uow).Handle); | |
} | |
} | |
/* concrete command handler */ | |
// this could also be collapsed in just a static function | |
// by injecting UnitOfWork as method parameter | |
public class CreateOrderCommandHandler : Handles<CreateOrder> | |
{ | |
readonly AggregateUnitOfWork uow; | |
public CreateInvoiceCommandHandler(AggregateUnitOfWork uow) | |
{ | |
this.uow = uow; | |
} | |
public void Handle(CreateOrder c) | |
{ | |
uow.Register(new OrderAggregate(c.OrderId, c.Data)); | |
} | |
} | |
/* somewhere in your FrontEnd application */ | |
Bus bus = new Bus(); // in-memory router | |
new AggregateCommandHandlingComponent() | |
.Configure(e => mq.Enqueue(e)) | |
.Register(bus); | |
new ProjectionHandlingComponent() | |
.Register(bus); | |
new QueryHandlingComponent() | |
.Register(bus); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment