Skip to content

Instantly share code, notes, and snippets.

@mikeeast
Last active December 17, 2015 08:49
Show Gist options
  • Save mikeeast/5582381 to your computer and use it in GitHub Desktop.
Save mikeeast/5582381 to your computer and use it in GitHub Desktop.
This is how I perform the poor man's message reordering.
using System;
namespace MyNamespace.ReadModel.Data.Models
{
public class Event
{
public int EventVersion { get; set; }
public int CommitVersion { get; set; }
public DateTime ReceivedDate { get; set; }
public DateTime EventDate { get; set; }
public string UserName { get; set; }
public string EventType { get; set; }
public string Payload { get; set; }
public int Retries { get; set; }
public bool Handled { get; set; }
}
}
using System;
using System.Collections.Generic;
namespace MyNamespace.ReadModel.Data.Models
{
public class EventAggregate : Dto
{
public IList<Event> Events { get; private set; }
public EventAggregate(Guid aggregateRootId)
: base(aggregateRootId)
{
Events = new List<Event>();
}
}
}
using System;
using MyNamespace.ReadModel.Data.Models;
using Raven.Client;
using Raven.Client.Linq;
namespace MyNamespace.ReadModel.Web.EventHandlers
{
public class EventHandlerBase<T>
{
protected T Load(IDocumentSession session, Guid id)
{
return session.LoadWithId<T>(id);
}
}
}
using System;
using System.Linq;
using System.Threading;
using NServiceBus;
using MyNamespace.Core.Extensions;
using MyNamespace.Core.Support.Logging;
using MyNamespace.Messages.Infrastructure;
using MyNamespace.ReadModel.Data.Models;
using Raven.Abstractions.Exceptions;
using Raven.Client;
using Raven.Client.Document;
using log4net;
namespace MyNamespace.ReadModel.EventHandlers
{
public class EventVersionOrderingHandler :
EventHandlerBase<EventAggregate>,
IHandleMessages<IEvent>
{
readonly IDocumentSession session;
readonly ICreateEventContexts eventContextFactory;
readonly IBus bus;
public EventVersionOrderingHandler(IDocumentSession session, ICreateEventContexts eventContextFactory, IBus bus)
{
this.session = session;
this.eventContextFactory = eventContextFactory;
this.bus = bus;
Log = NullLogger.Instance;
}
public ILog Log { get; set; }
public void Handle(IEvent message)
{
session.Advanced.AllowNonAuthoritativeInformation = false;
session.Advanced.UseOptimisticConcurrency = true;
var eventContext = eventContextFactory.Create(message);
var eventAggregate = Load(session, eventContext.AggregateRootId);
if (eventAggregate == null)
{
eventAggregate = new EventAggregate(eventContext.AggregateRootId);
session.Store(eventAggregate);
}
var etag = session.Advanced.GetEtagFor(eventAggregate);
var events = eventAggregate.Events
.Where(e => e.Handled)
.ToList();
var eventSequence = events
.Select(e => e.EventVersion + 1)
.Except(events.Select(e => e.EventVersion))
.ToList();
var expectedEventVersion = 1;
if (eventSequence.Any())
{
expectedEventVersion = eventSequence.Last();
if (eventSequence.Count() > 1)
{
throw new Exception(string.Format("There is a hole in the event sequence for aggregate {0}. I expected version {1}, but received version {2}. I'm missing {3}", eventContext.AggregateRootId, eventSequence.Last(), eventContext.EventVersion, string.Join(", ", eventSequence)));
}
}
var evt = eventAggregate.Events
.SingleOrDefault(e => e.EventVersion == eventContext.EventVersion);
if (evt == null)
{
evt = new Event
{
EventVersion = eventContext.EventVersion,
ReceivedDate = DateTime.UtcNow,
EventDate = eventContext.EventDate.GetValueOrDefault(),
UserName = eventContext.UserName,
CommitVersion = eventContext.CommitVersion,
EventType = message.GetType().FullName,
Payload = message.AsJson(),
Retries = 0,
Handled = false
};
eventAggregate.Events.Add(evt);
}
if (expectedEventVersion != eventContext.EventVersion)
{
if (evt.Retries > 10)
{
throw new Exception(string.Format("I have retried this message aggregate '{0}' the maximum number of times ({1}) and will throw it out to be handled. I expected version {2}, but received version {3}.", eventContext.AggregateRootId, evt.Retries, expectedEventVersion, eventContext.EventVersion));
}
Log.Info(string.Format("I will not handle this message for aggregate '{0}'. I expected version {1}, but received version {2}. Sending to back of queue.", eventContext.AggregateRootId, expectedEventVersion, eventContext.EventVersion));
bus.HandleCurrentMessageLater();
bus.DoNotContinueDispatchingCurrentMessageToHandlers();
evt.Retries++;
}
else
{
evt.Handled = true;
}
try
{
session.Store(eventAggregate, etag.GetValueOrDefault());
}
catch (ConcurrencyException concurrencyException)
{
Log.Warn("A concurrency exception occurred when ordering messages", concurrencyException);
bus.HandleCurrentMessageLater();
bus.DoNotContinueDispatchingCurrentMessageToHandlers();
}
}
}
}
var store = new DocumentStore
{
ConnectionStringName = ConnectionBuilder.GetConnectionStringSetting(DataStore.ReadModel).Name
};
store.Initialize();
store.ResourceManagerId = new Guid("67806EE3-B593-4581-90D5-A7FC0966C4F2");
store.DatabaseCommands.EnsureDatabaseExists(store.DefaultDatabase);
For<IDocumentStore>().Singleton().Use(store);
For<IDocumentSession>().Use(ctx => ctx.GetInstance<IDocumentStore>().OpenSession());
For<IManageUnitsOfWork>().Use<RavenUnitOfWork>();
using System;
using MyNamespace.Messages.Events;
using Raven.Client;
namespace MyNamespace.ReadModel.Data.Models
{
public static class RavenExtensions
{
public static T LoadWithId<T>(this IDocumentSession session, Guid guid)
{
return session.Load<T>(guid.GetId<T>());
}
public static string GetId<T>(this Guid guid)
{
return typeof(T).Name + "/" + guid;
}
}
}
using System;
using NServiceBus.UnitOfWork;
using Raven.Client;
namespace MyNamespace.Core.Messaging.NServiceBusIntegration
{
public class RavenUnitOfWork : IManageUnitsOfWork
{
readonly IDocumentSession session;
public RavenUnitOfWork(IDocumentSession session)
{
this.session = session;
}
public void Begin()
{
}
public void End(Exception ex)
{
if (ex == null)
session.SaveChanges();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment