Last active
December 17, 2015 08:49
-
-
Save mikeeast/5582381 to your computer and use it in GitHub Desktop.
This is how I perform the poor man's message reordering.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace MyNamespace.ReadModel.Data.Models | |
{ | |
public class Event | |
{ | |
public int EventVersion { get; set; } | |
public int CommitVersion { get; set; } | |
public DateTime ReceivedDate { get; set; } | |
public DateTime EventDate { get; set; } | |
public string UserName { get; set; } | |
public string EventType { get; set; } | |
public string Payload { get; set; } | |
public int Retries { get; set; } | |
public bool Handled { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
namespace MyNamespace.ReadModel.Data.Models | |
{ | |
public class EventAggregate : Dto | |
{ | |
public IList<Event> Events { get; private set; } | |
public EventAggregate(Guid aggregateRootId) | |
: base(aggregateRootId) | |
{ | |
Events = new List<Event>(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using MyNamespace.ReadModel.Data.Models; | |
using Raven.Client; | |
using Raven.Client.Linq; | |
namespace MyNamespace.ReadModel.Web.EventHandlers | |
{ | |
public class EventHandlerBase<T> | |
{ | |
protected T Load(IDocumentSession session, Guid id) | |
{ | |
return session.LoadWithId<T>(id); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Threading; | |
using NServiceBus; | |
using MyNamespace.Core.Extensions; | |
using MyNamespace.Core.Support.Logging; | |
using MyNamespace.Messages.Infrastructure; | |
using MyNamespace.ReadModel.Data.Models; | |
using Raven.Abstractions.Exceptions; | |
using Raven.Client; | |
using Raven.Client.Document; | |
using log4net; | |
namespace MyNamespace.ReadModel.EventHandlers | |
{ | |
public class EventVersionOrderingHandler : | |
EventHandlerBase<EventAggregate>, | |
IHandleMessages<IEvent> | |
{ | |
readonly IDocumentSession session; | |
readonly ICreateEventContexts eventContextFactory; | |
readonly IBus bus; | |
public EventVersionOrderingHandler(IDocumentSession session, ICreateEventContexts eventContextFactory, IBus bus) | |
{ | |
this.session = session; | |
this.eventContextFactory = eventContextFactory; | |
this.bus = bus; | |
Log = NullLogger.Instance; | |
} | |
public ILog Log { get; set; } | |
public void Handle(IEvent message) | |
{ | |
session.Advanced.AllowNonAuthoritativeInformation = false; | |
session.Advanced.UseOptimisticConcurrency = true; | |
var eventContext = eventContextFactory.Create(message); | |
var eventAggregate = Load(session, eventContext.AggregateRootId); | |
if (eventAggregate == null) | |
{ | |
eventAggregate = new EventAggregate(eventContext.AggregateRootId); | |
session.Store(eventAggregate); | |
} | |
var etag = session.Advanced.GetEtagFor(eventAggregate); | |
var events = eventAggregate.Events | |
.Where(e => e.Handled) | |
.ToList(); | |
var eventSequence = events | |
.Select(e => e.EventVersion + 1) | |
.Except(events.Select(e => e.EventVersion)) | |
.ToList(); | |
var expectedEventVersion = 1; | |
if (eventSequence.Any()) | |
{ | |
expectedEventVersion = eventSequence.Last(); | |
if (eventSequence.Count() > 1) | |
{ | |
throw new Exception(string.Format("There is a hole in the event sequence for aggregate {0}. I expected version {1}, but received version {2}. I'm missing {3}", eventContext.AggregateRootId, eventSequence.Last(), eventContext.EventVersion, string.Join(", ", eventSequence))); | |
} | |
} | |
var evt = eventAggregate.Events | |
.SingleOrDefault(e => e.EventVersion == eventContext.EventVersion); | |
if (evt == null) | |
{ | |
evt = new Event | |
{ | |
EventVersion = eventContext.EventVersion, | |
ReceivedDate = DateTime.UtcNow, | |
EventDate = eventContext.EventDate.GetValueOrDefault(), | |
UserName = eventContext.UserName, | |
CommitVersion = eventContext.CommitVersion, | |
EventType = message.GetType().FullName, | |
Payload = message.AsJson(), | |
Retries = 0, | |
Handled = false | |
}; | |
eventAggregate.Events.Add(evt); | |
} | |
if (expectedEventVersion != eventContext.EventVersion) | |
{ | |
if (evt.Retries > 10) | |
{ | |
throw new Exception(string.Format("I have retried this message aggregate '{0}' the maximum number of times ({1}) and will throw it out to be handled. I expected version {2}, but received version {3}.", eventContext.AggregateRootId, evt.Retries, expectedEventVersion, eventContext.EventVersion)); | |
} | |
Log.Info(string.Format("I will not handle this message for aggregate '{0}'. I expected version {1}, but received version {2}. Sending to back of queue.", eventContext.AggregateRootId, expectedEventVersion, eventContext.EventVersion)); | |
bus.HandleCurrentMessageLater(); | |
bus.DoNotContinueDispatchingCurrentMessageToHandlers(); | |
evt.Retries++; | |
} | |
else | |
{ | |
evt.Handled = true; | |
} | |
try | |
{ | |
session.Store(eventAggregate, etag.GetValueOrDefault()); | |
} | |
catch (ConcurrencyException concurrencyException) | |
{ | |
Log.Warn("A concurrency exception occurred when ordering messages", concurrencyException); | |
bus.HandleCurrentMessageLater(); | |
bus.DoNotContinueDispatchingCurrentMessageToHandlers(); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var store = new DocumentStore | |
{ | |
ConnectionStringName = ConnectionBuilder.GetConnectionStringSetting(DataStore.ReadModel).Name | |
}; | |
store.Initialize(); | |
store.ResourceManagerId = new Guid("67806EE3-B593-4581-90D5-A7FC0966C4F2"); | |
store.DatabaseCommands.EnsureDatabaseExists(store.DefaultDatabase); | |
For<IDocumentStore>().Singleton().Use(store); | |
For<IDocumentSession>().Use(ctx => ctx.GetInstance<IDocumentStore>().OpenSession()); | |
For<IManageUnitsOfWork>().Use<RavenUnitOfWork>(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using MyNamespace.Messages.Events; | |
using Raven.Client; | |
namespace MyNamespace.ReadModel.Data.Models | |
{ | |
public static class RavenExtensions | |
{ | |
public static T LoadWithId<T>(this IDocumentSession session, Guid guid) | |
{ | |
return session.Load<T>(guid.GetId<T>()); | |
} | |
public static string GetId<T>(this Guid guid) | |
{ | |
return typeof(T).Name + "/" + guid; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using NServiceBus.UnitOfWork; | |
using Raven.Client; | |
namespace MyNamespace.Core.Messaging.NServiceBusIntegration | |
{ | |
public class RavenUnitOfWork : IManageUnitsOfWork | |
{ | |
readonly IDocumentSession session; | |
public RavenUnitOfWork(IDocumentSession session) | |
{ | |
this.session = session; | |
} | |
public void Begin() | |
{ | |
} | |
public void End(Exception ex) | |
{ | |
if (ex == null) | |
session.SaveChanges(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment