Skip to content

Instantly share code, notes, and snippets.

@hyrmn
Created July 29, 2012 16:27
Show Gist options
  • Save hyrmn/3200053 to your computer and use it in GitHub Desktop.
Save hyrmn/3200053 to your computer and use it in GitHub Desktop.
My incomplete but evolving understanding of CQRS
using System;
using System.Reflection;
using CommonDomain;
using CommonDomain.Core;
using CommonDomain.Persistence;
using CommonDomain.Persistence.EventStore;
using EventStore;
using EventStore.Dispatcher;
using MemBus;
using MemBus.Configurators;
using MemBus.Subscribing;
using Shouldly;
using Xunit;
//These are the nuget packages I'm using
/*<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="CommonDomain" version="1.4.0" targetFramework="net40" />
<package id="EventStore" version="3.0.11326.44" targetFramework="net40" />
<package id="MemBus" version="1.5.4" targetFramework="net40" />
<package id="Shouldly" version="1.1.1.1" targetFramework="net40" />
<package id="xunit" version="1.9.1" targetFramework="net40" />
</packages>*/
//HUGE CAVEATE <<<<< I am not an expert on CQRS. Second, there is no "one way" to do CQRS. Third, CQRS isn't
//the best fit for every app, or every part of every app. That said, what I have below should provide a decent sketch.
//Overall, here are my thoughts.... first, using some of the available tools out there, it's pretty straightforward to implement.
//
//Second, it's easy to grow this up to your needs. More work at the low-end but less work at scale. Once synch with MemBus is too slow,
//we can take it out of band and make it asynch. Once MemBus isn't scaling in-proc for us, we can replace it with NServiceBus or our own implementation
//on top of ZeroMQ. We're storing denormalized data in Raven. Once Raven is the bottleneck (work with me and pretend it doesn't have sharding for a minute) then
//we could denormalize to multiple instances... or we could choose to put some things in both Raven and in to, say, a ring database like RRDTool or a
//bucket aggregator like StatsD. That's where things, in my opinion, get easier when you're message-based and passing commands around.
//
//Third, this is a lot more code than the CRUD-style I write. Part of that is based on how I've broken things out here.. .Part of it is based on
//just the overhead of CQRS on C# (at least my understanding of CQRS as expressed below). Rinat Abdullin has an approach of using a custom dialect + T4 parser
//to cull away the boilerplate stuff.
//
//At the end of the day, I believe this overall approach has a lot of merit and I'll be discussing adoption pros and cons with my coworker before we proceed.
//After I can hash things out with his brilliant mind, I'll write up my refined thoughts and a proper example.
//This is all xunit.net right now. It should be redone in mspec as it'd make following along much, much easier
//I'll probably do that later. Maybe.
//xUnit.net calls the constructor and dispose (if implemented) on your test class before each test. Consider that
//the test setup/teardown section.
namespace Tests.PropertyManagement
{
//This is my mock UI. It'd be something like an MVC controller (well, FubuMVC in my case because I roll like that)
public class SomeAwesomeUI
{
private readonly IBus _bus;
public SomeAwesomeUI(IBus bus)
{
_bus = bus;
}
//Simulate the client kicking something off
public Guid CreateNewAccount()
{
var createCommand = new CreateAccountCommand(Guid.NewGuid(), "Testy", new Address("2501 AStreet", "Denver", "CO"));
_bus.Publish(createCommand);
return createCommand.Id;
}
public void CloseAccount(Guid accountId)
{
var closeCommand = new CloseAccountCommand(accountId);
_bus.Publish(closeCommand);
}
}
//Commands to do things are sent to your domain
//For a great discussion on validation with commands, check out http://ingebrigtsen.info/2012/07/28/cqrs-in-asp-net-mvc-with-bifrost/
public class CreateAccountCommand
{
public readonly Guid Id;
public readonly string Name;
public Address Address { get; set; }
public CreateAccountCommand(Guid id, string accountName, Address address)
{
Id = id;
Name = accountName;
Address = address;
}
}
//A command doesn't need to carry state if you don't want it to... Here, we're just telling it the account id to close.
public class CloseAccountCommand
{
public readonly Guid AccountId;
public CloseAccountCommand(Guid accountId)
{
AccountId = accountId;
}
}
//By convention, I mark my command handlers with this interface. It's partially to handle wiring and partially
//so I can toss around things like contravariant
public interface IHandleCommand<in T>
{
void Handle(T command);
}
//This is the handler that will apply commands to my domain. I could choose
//another round of some sort of non-business rule validation here. I could
//log stuff. Whatever. There's also no reason that you need one CommandHandler
//per command. I'm just doing this because I think this is how our real impl will
//shape out.
//IRepository comes from CommonDomain and is a facade over EventStore (both by Jonathan Oliver)
public class CreateAccountCommandHandler : IHandleCommand<CreateAccountCommand>
{
private readonly IRepository _repository;
public CreateAccountCommandHandler(IRepository repository)
{
_repository = repository;
}
public void Handle(CreateAccountCommand command)
{
var account = new Account(command.Id, command.Name, command.Address);
_repository.Save(account, Guid.NewGuid());
}
}
//This may _look_ like a normal "load this by id and then mutate state and then save it" repository
//However, it's actually loading the entire event stream for that object and re-applying the events to
//it. Don't worry, though, it's not re-publishing events to the bus.. it's just raising events
//internally to the object. Your domain object, at the end, will be the culmination of all of those
//applied events. This would be much simpler in F# of we thought about domain state as a left fold
//of immutable events causing state change.
//
//One neat thing about EventStore and, by extension, CommonDomain, is that you can load versions of your
//object. Check out the overloads on _repository.GetById some time.
public class DeactivateAccountCommandHandler : IHandleCommand<CloseAccountCommand>
{
private readonly IRepository _repository;
public DeactivateAccountCommandHandler(IRepository repository)
{
_repository = repository;
}
public void Handle(CloseAccountCommand command)
{
var account = _repository.GetById<Account>(command.AccountId);
account.Close();
_repository.Save(account, Guid.NewGuid());
}
}
//By convention, I want to provide two means for creating domain objects. To the public, I want
//to provide an always-valid constructor. This explicitly shows what needs to be provided to the domain
//to create a valid instance of that object (eg, Person needs a twitter handle to be valid if I were doing twitter stream analysis)
//Internally, to EventStore, I want it to be able to create my object via a private ctor and I'm going to pass in the
//objects id.
//This method is pretty simplistic but my current domain suits it just fine.
public class AggregateFactory : IConstructAggregates
{
public IAggregate Build(Type type, Guid id, IMemento snapshot)
{
ConstructorInfo constructor = type.GetConstructor(
BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(Guid) }, null);
return constructor.Invoke(new object[] { id }) as IAggregate;
}
}
//Ok, so this is where things get a little.... interesting
//EventStore is sort of my coordinator for everything
//When I create a new domain object, I issue commands to it. It, in turn, raises events to change its internal state.
//
//Again, thing of the current state of a domain object as what you get after all events that built it are applied.
//new Person("@benhyr") might raise a PersonCreatedEvent. Then person.UpdateTwitterId("@hyrmn") raises a PersonUpdatedEvent
//When I load that Person from the EventStore, rather than getting Person.TwitterId from a db field, I'm getting PersonCreatedEvent
//(which sets the TwitterId initially) and then PersonUpdatedEvent (which updates the TwitterId to the new value)
//
//Now, back to this class. When I raise events, they're uncommitted until I persist them back to the EventStore.
//By default, we assume others might be interested in these uncommitted events. Of course, it's EventStore not EventStoreAndMessageBus
//(although EventStore could do some basic stuff for us). So, we're telling EventStore to publish to our MemBus bus... at some point,
//we might put NSB or MassTransit or EasyNetQ or whatever in place.
public static class DelegateDispatcher
{
public static void DispatchCommit(IPublisher bus, Commit commit)
{
// This is where we'd hook into our messaging infrastructure, such as NServiceBus,
// MassTransit, WCF, or some other communications infrastructure.
// This can be a class as well--just implement IDispatchCommits.
foreach (var @event in commit.Events)
bus.Publish(@event.Body);
}
}
//On to the concrete part of the spike... we have accounts, accounts are an aggregate in my domain.
//For this spike, accounts have a name, are active or inactive (in the real world, they're deactivated for many reasons, but not here)
//and an account has an address (in the real world, they actually have a couple addresses. Again, not germane to this spike)
//This is just a value object in DDD parlance. It has no identity itself because it's always owned by an entity object.
public class Address
{
public Address(string line1, string city, string state)
{
this.Line1 = line1;
this.City = city;
this.State = state;
}
public string Line1 { get; private set; }
public string City { get; private set; }
public string State { get; private set; }
}
//This is my old buddy Account. It inherits from AggregateBase, which comes from CommonDomain.
//There's no real need to bring CommonDomain in if you don't want. It provides a couple simple mechanisms for me.
//First, it gives me the IRepository wrapper around EventStore which I use above in my CommandHandlers
//Second, it gives me a base that tracks all of my uncommitted changes for me.
//Third, it wires up, by convention, my event handlers (the private void Apply(SomeEvent @event) methods
public class Account : AggregateBase
{
public string Name { get; set; }
public bool IsActive { get; set; }
public Address Address { get; set; }
private Account(Guid id)
{
this.Id = id;
}
public Account(Guid id, string name, Address address)
{
this.Id = id;
// the event will be routed by convention to a method called ApplyEvent(type of event)
RaiseEvent(new AccountCreatedEvent
{
Id = this.Id,
Name = name,
Address = address,
IsActive = true
});
}
public void Close()
{
this.RaiseEvent(new AccountClosedEvent());
}
private void Apply(AccountCreatedEvent @event)
{
this.Id = @event.Id;
this.Name = @event.Name;
this.Address = @event.Address;
}
private void Apply(AccountClosedEvent @event)
{
this.IsActive = false;
}
}
//A marker interface I have for my own sanity. Useful for convention-based
//analysis and verification
public interface IDomainEvent
{
}
//This is going to seem a bit conflated so bear with me. When we create a new Account,
//we raise an AccountCreatedEvent. We then apply that AccountCreatedEvent to ourselves.
//Once we save our uncommitted events to EventStore, then that AccountCreatedEvent is also
//sent out to our bus for other interested parties.
[Serializable]
public class AccountCreatedEvent : IDomainEvent
{
public Guid Id { get; set; }
public string Name { get; set; }
public bool IsActive { get; set; }
public Address Address { get; set; }
}
[Serializable]
public class AccountClosedEvent : IDomainEvent
{
}
//Again, another convention interface so I can tell my bus how to resolve my handlers.
//Any party that wants to know about a particular event will mark itself as such.
public interface IHandleEvent<in T>
{
void Handle(T @event);
}
//Normally this class would do something awesome like update Raven
//There's no reason for this to be a single denormalizer
//However, there's no reason for this to be multiple denormalizers. Design decision!
//Our use-case in production is that our denormalizers will update our flattened models in RavenDB
//although, honestly, it could be SQL Server, Mongo, Raik, whatever.
public class AccountDenormalizer : IHandleEvent<AccountCreatedEvent>,
IHandleEvent<AccountClosedEvent>
{
public string AccountName { get; set; }
public bool IsActive { get; set; }
public void Handle(AccountCreatedEvent @event)
{
AccountName = @event.Name;
}
public void Handle(AccountClosedEvent @event)
{
IsActive = false;
}
}
//And, to show multiple event handlers in action, here's a handler that might
//do something like send an email welcoming the person that just registered
//or maybe a cool SignalR tie-in that goes to the sales dashboard
//or a web service endpoint that has a Netduino polling it and ringing a gong when
//someone signs up. You know, whatever.
public class KaChingNotifier : IHandleEvent<AccountCreatedEvent>
{
public void Handle(AccountCreatedEvent @event)
{
Console.WriteLine("Dude, we got a customer, we're gonna be rich!");
}
}
public class OmgSadnessNotifier : IHandleEvent<AccountClosedEvent>
{
public void Handle(AccountClosedEvent @event)
{
Console.WriteLine("Dude, we lost a customer... start the layoffs :(");
}
}
//and, now we're into the meat of the spike. This is the xUnit.net class under test
//a lot of this looks repetitive because, well, it was written as I was increasing my
//understanding of things.
public class EndToEndTest
{
private readonly SomeAwesomeUI _client;
private readonly IBus _bus;
//Here, I'm wiring up my MemBus instance and telling it how to resolve my subscribers
//MemBus also has an awesome way to resolve subscribers from an IoC container. In prod,
//I'll wire my subscribers into StructureMap and have MemBus resolve them from there.
//I'm also initializing my awesome test client UI which, if you'll recall from way back at the start
//simply publishes commands to my MemBus instance (and, again, it could be whatever)
public EndToEndTest()
{
_bus = BusSetup.StartWith<Conservative>()
.Apply<FlexibleSubscribeAdapter>(a =>
{
a.ByInterface(typeof(IHandleEvent<>));
a.ByInterface(typeof(IHandleCommand<>));
})
.Construct();
_client = new SomeAwesomeUI(_bus);
}
[Fact]
public void CanPublishCreateAccountCommand()
{
Should.NotThrow(() => _client.CreateNewAccount());
}
[Fact]
public void CanReceiveCreateAccountCommand()
{
var store = Wireup.Init().UsingInMemoryPersistence().Build();
var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
_bus.Subscribe(handler);
Should.NotThrow(() => _client.CreateNewAccount());
}
[Fact]
public void CreateAccountEventIsStored()
{
var store = Wireup.Init().UsingInMemoryPersistence().Build();
var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector());
var handler = new CreateAccountCommandHandler(repository);
_bus.Subscribe(handler);
var accountId = _client.CreateNewAccount();
store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBeGreaterThan(0);
}
[Fact]
public void CanLoadAccountFromEventStore()
{
var store = Wireup.Init().UsingInMemoryPersistence().Build();
var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector());
var handler = new CreateAccountCommandHandler(repository);
_bus.Subscribe(handler);
var accountId = _client.CreateNewAccount();
var account = repository.GetById<Account>(accountId);
account.ShouldNotBe(null);
account.Name.ShouldBe("Testy");
}
[Fact]
public void CreateAccountEventIsPublishedToBus()
{
var store = Wireup.Init().UsingInMemoryPersistence()
.UsingSynchronousDispatchScheduler()
.DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
.Build();
var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
var denormalizer = new AccountDenormalizer();
_bus.Subscribe(handler);
_bus.Subscribe(denormalizer);
_client.CreateNewAccount();
denormalizer.AccountName.ShouldBe("Testy");
}
[Fact]
public void DeactivingAccountDoesntRetriggerInitialCreate()
{
var store = Wireup.Init().UsingInMemoryPersistence()
.UsingSynchronousDispatchScheduler()
.DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
.Build();
var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
var denormalizer = new AccountDenormalizer();
_bus.Subscribe(createHandler);
_bus.Subscribe(deactivateHandler);
_bus.Subscribe(denormalizer);
var accountId = _client.CreateNewAccount();
_client.CloseAccount(accountId);
denormalizer.AccountName.ShouldBe("Testy");
denormalizer.IsActive.ShouldBe(false);
store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2);
}
//For fun, run this with the Debugger (eg, if using TDD.NET then right click on this method and select Test With -> Debugger.
//Put break points in various spots of the code above and see what happens.
[Fact]
public void TyingtTogether()
{
var store = Wireup.Init().UsingInMemoryPersistence()
.UsingSynchronousDispatchScheduler()
.DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
.Build();
var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
var denormalizer = new AccountDenormalizer();
_bus.Subscribe(createHandler);
_bus.Subscribe(deactivateHandler);
_bus.Subscribe(denormalizer);
_bus.Subscribe(new KaChingNotifier());
_bus.Subscribe(new OmgSadnessNotifier());
var accountId = _client.CreateNewAccount();
_client.CloseAccount(accountId);
denormalizer.AccountName.ShouldBe("Testy");
denormalizer.IsActive.ShouldBe(false);
store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment