Created
July 29, 2012 16:27
-
-
Save hyrmn/3200053 to your computer and use it in GitHub Desktop.
My incomplete but evolving understanding of CQRS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Reflection; | |
using CommonDomain; | |
using CommonDomain.Core; | |
using CommonDomain.Persistence; | |
using CommonDomain.Persistence.EventStore; | |
using EventStore; | |
using EventStore.Dispatcher; | |
using MemBus; | |
using MemBus.Configurators; | |
using MemBus.Subscribing; | |
using Shouldly; | |
using Xunit; | |
//These are the nuget packages I'm using | |
/*<?xml version="1.0" encoding="utf-8"?> | |
<packages> | |
<package id="CommonDomain" version="1.4.0" targetFramework="net40" /> | |
<package id="EventStore" version="3.0.11326.44" targetFramework="net40" /> | |
<package id="MemBus" version="1.5.4" targetFramework="net40" /> | |
<package id="Shouldly" version="1.1.1.1" targetFramework="net40" /> | |
<package id="xunit" version="1.9.1" targetFramework="net40" /> | |
</packages>*/ | |
//HUGE CAVEATE <<<<< I am not an expert on CQRS. Second, there is no "one way" to do CQRS. Third, CQRS isn't | |
//the best fit for every app, or every part of every app. That said, what I have below should provide a decent sketch. | |
//Overall, here are my thoughts.... first, using some of the available tools out there, it's pretty straightforward to implement. | |
// | |
//Second, it's easy to grow this up to your needs. More work at the low-end but less work at scale. Once synch with MemBus is too slow, | |
//we can take it out of band and make it asynch. Once MemBus isn't scaling in-proc for us, we can replace it with NServiceBus or our own implementation | |
//on top of ZeroMQ. We're storing denormalized data in Raven. Once Raven is the bottleneck (work with me and pretend it doesn't have sharding for a minute) then | |
//we could denormalize to multiple instances... or we could choose to put some things in both Raven and in to, say, a ring database like RRDTool or a | |
//bucket aggregator like StatsD. That's where things, in my opinion, get easier when you're message-based and passing commands around. | |
// | |
//Third, this is a lot more code than the CRUD-style I write. Part of that is based on how I've broken things out here.. .Part of it is based on | |
//just the overhead of CQRS on C# (at least my understanding of CQRS as expressed below). Rinat Abdullin has an approach of using a custom dialect + T4 parser | |
//to cull away the boilerplate stuff. | |
// | |
//At the end of the day, I believe this overall approach has a lot of merit and I'll be discussing adoption pros and cons with my coworker before we proceed. | |
//After I can hash things out with his brilliant mind, I'll write up my refined thoughts and a proper example. | |
//This is all xunit.net right now. It should be redone in mspec as it'd make following along much, much easier | |
//I'll probably do that later. Maybe. | |
//xUnit.net calls the constructor and dispose (if implemented) on your test class before each test. Consider that | |
//the test setup/teardown section. | |
namespace Tests.PropertyManagement | |
{ | |
//This is my mock UI. It'd be something like an MVC controller (well, FubuMVC in my case because I roll like that) | |
public class SomeAwesomeUI | |
{ | |
private readonly IBus _bus; | |
public SomeAwesomeUI(IBus bus) | |
{ | |
_bus = bus; | |
} | |
//Simulate the client kicking something off | |
public Guid CreateNewAccount() | |
{ | |
var createCommand = new CreateAccountCommand(Guid.NewGuid(), "Testy", new Address("2501 AStreet", "Denver", "CO")); | |
_bus.Publish(createCommand); | |
return createCommand.Id; | |
} | |
public void CloseAccount(Guid accountId) | |
{ | |
var closeCommand = new CloseAccountCommand(accountId); | |
_bus.Publish(closeCommand); | |
} | |
} | |
//Commands to do things are sent to your domain | |
//For a great discussion on validation with commands, check out http://ingebrigtsen.info/2012/07/28/cqrs-in-asp-net-mvc-with-bifrost/ | |
public class CreateAccountCommand | |
{ | |
public readonly Guid Id; | |
public readonly string Name; | |
public Address Address { get; set; } | |
public CreateAccountCommand(Guid id, string accountName, Address address) | |
{ | |
Id = id; | |
Name = accountName; | |
Address = address; | |
} | |
} | |
//A command doesn't need to carry state if you don't want it to... Here, we're just telling it the account id to close. | |
public class CloseAccountCommand | |
{ | |
public readonly Guid AccountId; | |
public CloseAccountCommand(Guid accountId) | |
{ | |
AccountId = accountId; | |
} | |
} | |
//By convention, I mark my command handlers with this interface. It's partially to handle wiring and partially | |
//so I can toss around things like contravariant | |
public interface IHandleCommand<in T> | |
{ | |
void Handle(T command); | |
} | |
//This is the handler that will apply commands to my domain. I could choose | |
//another round of some sort of non-business rule validation here. I could | |
//log stuff. Whatever. There's also no reason that you need one CommandHandler | |
//per command. I'm just doing this because I think this is how our real impl will | |
//shape out. | |
//IRepository comes from CommonDomain and is a facade over EventStore (both by Jonathan Oliver) | |
public class CreateAccountCommandHandler : IHandleCommand<CreateAccountCommand> | |
{ | |
private readonly IRepository _repository; | |
public CreateAccountCommandHandler(IRepository repository) | |
{ | |
_repository = repository; | |
} | |
public void Handle(CreateAccountCommand command) | |
{ | |
var account = new Account(command.Id, command.Name, command.Address); | |
_repository.Save(account, Guid.NewGuid()); | |
} | |
} | |
//This may _look_ like a normal "load this by id and then mutate state and then save it" repository | |
//However, it's actually loading the entire event stream for that object and re-applying the events to | |
//it. Don't worry, though, it's not re-publishing events to the bus.. it's just raising events | |
//internally to the object. Your domain object, at the end, will be the culmination of all of those | |
//applied events. This would be much simpler in F# of we thought about domain state as a left fold | |
//of immutable events causing state change. | |
// | |
//One neat thing about EventStore and, by extension, CommonDomain, is that you can load versions of your | |
//object. Check out the overloads on _repository.GetById some time. | |
public class DeactivateAccountCommandHandler : IHandleCommand<CloseAccountCommand> | |
{ | |
private readonly IRepository _repository; | |
public DeactivateAccountCommandHandler(IRepository repository) | |
{ | |
_repository = repository; | |
} | |
public void Handle(CloseAccountCommand command) | |
{ | |
var account = _repository.GetById<Account>(command.AccountId); | |
account.Close(); | |
_repository.Save(account, Guid.NewGuid()); | |
} | |
} | |
//By convention, I want to provide two means for creating domain objects. To the public, I want | |
//to provide an always-valid constructor. This explicitly shows what needs to be provided to the domain | |
//to create a valid instance of that object (eg, Person needs a twitter handle to be valid if I were doing twitter stream analysis) | |
//Internally, to EventStore, I want it to be able to create my object via a private ctor and I'm going to pass in the | |
//objects id. | |
//This method is pretty simplistic but my current domain suits it just fine. | |
public class AggregateFactory : IConstructAggregates | |
{ | |
public IAggregate Build(Type type, Guid id, IMemento snapshot) | |
{ | |
ConstructorInfo constructor = type.GetConstructor( | |
BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(Guid) }, null); | |
return constructor.Invoke(new object[] { id }) as IAggregate; | |
} | |
} | |
//Ok, so this is where things get a little.... interesting | |
//EventStore is sort of my coordinator for everything | |
//When I create a new domain object, I issue commands to it. It, in turn, raises events to change its internal state. | |
// | |
//Again, thing of the current state of a domain object as what you get after all events that built it are applied. | |
//new Person("@benhyr") might raise a PersonCreatedEvent. Then person.UpdateTwitterId("@hyrmn") raises a PersonUpdatedEvent | |
//When I load that Person from the EventStore, rather than getting Person.TwitterId from a db field, I'm getting PersonCreatedEvent | |
//(which sets the TwitterId initially) and then PersonUpdatedEvent (which updates the TwitterId to the new value) | |
// | |
//Now, back to this class. When I raise events, they're uncommitted until I persist them back to the EventStore. | |
//By default, we assume others might be interested in these uncommitted events. Of course, it's EventStore not EventStoreAndMessageBus | |
//(although EventStore could do some basic stuff for us). So, we're telling EventStore to publish to our MemBus bus... at some point, | |
//we might put NSB or MassTransit or EasyNetQ or whatever in place. | |
public static class DelegateDispatcher | |
{ | |
public static void DispatchCommit(IPublisher bus, Commit commit) | |
{ | |
// This is where we'd hook into our messaging infrastructure, such as NServiceBus, | |
// MassTransit, WCF, or some other communications infrastructure. | |
// This can be a class as well--just implement IDispatchCommits. | |
foreach (var @event in commit.Events) | |
bus.Publish(@event.Body); | |
} | |
} | |
//On to the concrete part of the spike... we have accounts, accounts are an aggregate in my domain. | |
//For this spike, accounts have a name, are active or inactive (in the real world, they're deactivated for many reasons, but not here) | |
//and an account has an address (in the real world, they actually have a couple addresses. Again, not germane to this spike) | |
//This is just a value object in DDD parlance. It has no identity itself because it's always owned by an entity object. | |
public class Address | |
{ | |
public Address(string line1, string city, string state) | |
{ | |
this.Line1 = line1; | |
this.City = city; | |
this.State = state; | |
} | |
public string Line1 { get; private set; } | |
public string City { get; private set; } | |
public string State { get; private set; } | |
} | |
//This is my old buddy Account. It inherits from AggregateBase, which comes from CommonDomain. | |
//There's no real need to bring CommonDomain in if you don't want. It provides a couple simple mechanisms for me. | |
//First, it gives me the IRepository wrapper around EventStore which I use above in my CommandHandlers | |
//Second, it gives me a base that tracks all of my uncommitted changes for me. | |
//Third, it wires up, by convention, my event handlers (the private void Apply(SomeEvent @event) methods | |
public class Account : AggregateBase | |
{ | |
public string Name { get; set; } | |
public bool IsActive { get; set; } | |
public Address Address { get; set; } | |
private Account(Guid id) | |
{ | |
this.Id = id; | |
} | |
public Account(Guid id, string name, Address address) | |
{ | |
this.Id = id; | |
// the event will be routed by convention to a method called ApplyEvent(type of event) | |
RaiseEvent(new AccountCreatedEvent | |
{ | |
Id = this.Id, | |
Name = name, | |
Address = address, | |
IsActive = true | |
}); | |
} | |
public void Close() | |
{ | |
this.RaiseEvent(new AccountClosedEvent()); | |
} | |
private void Apply(AccountCreatedEvent @event) | |
{ | |
this.Id = @event.Id; | |
this.Name = @event.Name; | |
this.Address = @event.Address; | |
} | |
private void Apply(AccountClosedEvent @event) | |
{ | |
this.IsActive = false; | |
} | |
} | |
//A marker interface I have for my own sanity. Useful for convention-based | |
//analysis and verification | |
public interface IDomainEvent | |
{ | |
} | |
//This is going to seem a bit conflated so bear with me. When we create a new Account, | |
//we raise an AccountCreatedEvent. We then apply that AccountCreatedEvent to ourselves. | |
//Once we save our uncommitted events to EventStore, then that AccountCreatedEvent is also | |
//sent out to our bus for other interested parties. | |
[Serializable] | |
public class AccountCreatedEvent : IDomainEvent | |
{ | |
public Guid Id { get; set; } | |
public string Name { get; set; } | |
public bool IsActive { get; set; } | |
public Address Address { get; set; } | |
} | |
[Serializable] | |
public class AccountClosedEvent : IDomainEvent | |
{ | |
} | |
//Again, another convention interface so I can tell my bus how to resolve my handlers. | |
//Any party that wants to know about a particular event will mark itself as such. | |
public interface IHandleEvent<in T> | |
{ | |
void Handle(T @event); | |
} | |
//Normally this class would do something awesome like update Raven | |
//There's no reason for this to be a single denormalizer | |
//However, there's no reason for this to be multiple denormalizers. Design decision! | |
//Our use-case in production is that our denormalizers will update our flattened models in RavenDB | |
//although, honestly, it could be SQL Server, Mongo, Raik, whatever. | |
public class AccountDenormalizer : IHandleEvent<AccountCreatedEvent>, | |
IHandleEvent<AccountClosedEvent> | |
{ | |
public string AccountName { get; set; } | |
public bool IsActive { get; set; } | |
public void Handle(AccountCreatedEvent @event) | |
{ | |
AccountName = @event.Name; | |
} | |
public void Handle(AccountClosedEvent @event) | |
{ | |
IsActive = false; | |
} | |
} | |
//And, to show multiple event handlers in action, here's a handler that might | |
//do something like send an email welcoming the person that just registered | |
//or maybe a cool SignalR tie-in that goes to the sales dashboard | |
//or a web service endpoint that has a Netduino polling it and ringing a gong when | |
//someone signs up. You know, whatever. | |
public class KaChingNotifier : IHandleEvent<AccountCreatedEvent> | |
{ | |
public void Handle(AccountCreatedEvent @event) | |
{ | |
Console.WriteLine("Dude, we got a customer, we're gonna be rich!"); | |
} | |
} | |
public class OmgSadnessNotifier : IHandleEvent<AccountClosedEvent> | |
{ | |
public void Handle(AccountClosedEvent @event) | |
{ | |
Console.WriteLine("Dude, we lost a customer... start the layoffs :("); | |
} | |
} | |
//and, now we're into the meat of the spike. This is the xUnit.net class under test | |
//a lot of this looks repetitive because, well, it was written as I was increasing my | |
//understanding of things. | |
public class EndToEndTest | |
{ | |
private readonly SomeAwesomeUI _client; | |
private readonly IBus _bus; | |
//Here, I'm wiring up my MemBus instance and telling it how to resolve my subscribers | |
//MemBus also has an awesome way to resolve subscribers from an IoC container. In prod, | |
//I'll wire my subscribers into StructureMap and have MemBus resolve them from there. | |
//I'm also initializing my awesome test client UI which, if you'll recall from way back at the start | |
//simply publishes commands to my MemBus instance (and, again, it could be whatever) | |
public EndToEndTest() | |
{ | |
_bus = BusSetup.StartWith<Conservative>() | |
.Apply<FlexibleSubscribeAdapter>(a => | |
{ | |
a.ByInterface(typeof(IHandleEvent<>)); | |
a.ByInterface(typeof(IHandleCommand<>)); | |
}) | |
.Construct(); | |
_client = new SomeAwesomeUI(_bus); | |
} | |
[Fact] | |
public void CanPublishCreateAccountCommand() | |
{ | |
Should.NotThrow(() => _client.CreateNewAccount()); | |
} | |
[Fact] | |
public void CanReceiveCreateAccountCommand() | |
{ | |
var store = Wireup.Init().UsingInMemoryPersistence().Build(); | |
var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector())); | |
_bus.Subscribe(handler); | |
Should.NotThrow(() => _client.CreateNewAccount()); | |
} | |
[Fact] | |
public void CreateAccountEventIsStored() | |
{ | |
var store = Wireup.Init().UsingInMemoryPersistence().Build(); | |
var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()); | |
var handler = new CreateAccountCommandHandler(repository); | |
_bus.Subscribe(handler); | |
var accountId = _client.CreateNewAccount(); | |
store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBeGreaterThan(0); | |
} | |
[Fact] | |
public void CanLoadAccountFromEventStore() | |
{ | |
var store = Wireup.Init().UsingInMemoryPersistence().Build(); | |
var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()); | |
var handler = new CreateAccountCommandHandler(repository); | |
_bus.Subscribe(handler); | |
var accountId = _client.CreateNewAccount(); | |
var account = repository.GetById<Account>(accountId); | |
account.ShouldNotBe(null); | |
account.Name.ShouldBe("Testy"); | |
} | |
[Fact] | |
public void CreateAccountEventIsPublishedToBus() | |
{ | |
var store = Wireup.Init().UsingInMemoryPersistence() | |
.UsingSynchronousDispatchScheduler() | |
.DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c))) | |
.Build(); | |
var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector())); | |
var denormalizer = new AccountDenormalizer(); | |
_bus.Subscribe(handler); | |
_bus.Subscribe(denormalizer); | |
_client.CreateNewAccount(); | |
denormalizer.AccountName.ShouldBe("Testy"); | |
} | |
[Fact] | |
public void DeactivingAccountDoesntRetriggerInitialCreate() | |
{ | |
var store = Wireup.Init().UsingInMemoryPersistence() | |
.UsingSynchronousDispatchScheduler() | |
.DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c))) | |
.Build(); | |
var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector())); | |
var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector())); | |
var denormalizer = new AccountDenormalizer(); | |
_bus.Subscribe(createHandler); | |
_bus.Subscribe(deactivateHandler); | |
_bus.Subscribe(denormalizer); | |
var accountId = _client.CreateNewAccount(); | |
_client.CloseAccount(accountId); | |
denormalizer.AccountName.ShouldBe("Testy"); | |
denormalizer.IsActive.ShouldBe(false); | |
store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2); | |
} | |
//For fun, run this with the Debugger (eg, if using TDD.NET then right click on this method and select Test With -> Debugger. | |
//Put break points in various spots of the code above and see what happens. | |
[Fact] | |
public void TyingtTogether() | |
{ | |
var store = Wireup.Init().UsingInMemoryPersistence() | |
.UsingSynchronousDispatchScheduler() | |
.DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c))) | |
.Build(); | |
var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector())); | |
var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector())); | |
var denormalizer = new AccountDenormalizer(); | |
_bus.Subscribe(createHandler); | |
_bus.Subscribe(deactivateHandler); | |
_bus.Subscribe(denormalizer); | |
_bus.Subscribe(new KaChingNotifier()); | |
_bus.Subscribe(new OmgSadnessNotifier()); | |
var accountId = _client.CreateNewAccount(); | |
_client.CloseAccount(accountId); | |
denormalizer.AccountName.ShouldBe("Testy"); | |
denormalizer.IsActive.ShouldBe(false); | |
store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment