Skip to content

Instantly share code, notes, and snippets.

Created July 22, 2013 17:56
Show Gist options
  • Save danielmarbach/6056018 to your computer and use it in GitHub Desktop.
Save danielmarbach/6056018 to your computer and use it in GitHub Desktop.
My InMemory approach for unit testing and acceptance testing with NServiceBus which uses custom IBus, FluentAssertions and NUnit actions
[assembly: WithBus]
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.MessageInterfaces.MessageMapper.Reflection;
using NServiceBus.Saga;
public class Bus : IBus
private readonly Queue<object> publishedMessages = new Queue<object>();
private readonly Queue<object> sentMessages = new Queue<object>();
private readonly Queue<object> localMessages = new Queue<object>();
private readonly Queue<object> repliedMessages = new Queue<object>();
private readonly Queue<object> returnedMessages = new Queue<object>();
private readonly Queue<DeferredMessage> deferredMessages = new Queue<DeferredMessage>();
private readonly List<Tuple<Func<object>, Func<object, bool>>> sentLocalHandlers = new List<Tuple<Func<object>, Func<object, bool>>>();
private readonly List<Tuple<Func<object>, Func<object, bool>>> deferHandlers = new List<Tuple<Func<object>, Func<object, bool>>>();
private readonly List<Action<IEnumerable<object>, Callback>> registeredCallbacks = new List<Action<IEnumerable<object>, Callback>>();
private readonly IDictionary<object, IDictionary<string, string>> messageHeaders = new Dictionary<object, IDictionary<string, string>>();
public Bus()
this.OutgoingHeaders = new Dictionary<string, string>();
this.CurrentMessageContext = new MessageContext();
ExtensionMethods.SetHeaderAction = this.SetHeader;
MessageConventionExtensions.IsMessageTypeAction = // your conventions here;
MessageConventionExtensions.IsCommandTypeAction = // your conventions here;
MessageConventionExtensions.IsEventTypeAction = // your conventions here;
public IDictionary<string, string> OutgoingHeaders { get; private set; }
public IMessageContext CurrentMessageContext
get; private set;
public IInMemoryOperations InMemory { get; private set; }
public IEnumerable<object> Published
return this.publishedMessages;
public IEnumerable<object> Sent
return this.sentMessages;
public IEnumerable<object> Local
return this.localMessages;
public IEnumerable<object> Replied
return this.repliedMessages;
public IEnumerable<object> Returned
return this.returnedMessages;
public IEnumerable<DeferredMessage> DeferredMessages
get { return this.deferredMessages; }
public bool HandleCurrentMessageLaterWasCalled { get; set; }
public bool DoNotContinueDispatchingCurrentMessageToHandlersWasCalled { get; set; }
public string GetHeader(object message, string key)
return this.messageHeaders[message][key];
public void SetupForSendLocal<TMessage>(
Func<IHandleMessages<TMessage>> handlerCreator)
where TMessage : class
new Tuple<Func<object>, Func<object, bool>>(
handlerCreator, msg => (msg as TMessage) != null));
public void SetupForDefer<TMessage>(
Func<IHandleMessages<TMessage>> handlerCreator)
where TMessage : class
new Tuple<Func<object>, Func<object, bool>>(
handlerCreator, msg => (msg as TMessage) != null));
public TSaga NewSaga<TSaga>()
where TSaga : ISaga
return NewSaga(() => (TSaga)Activator.CreateInstance(typeof(TSaga)));
public TSaga NewSaga<TSaga>(Func<TSaga> creationCallback)
where TSaga : ISaga
var saga = creationCallback();
saga.Bus = this;
var sagaType = typeof(TSaga).GetInterfaces()
.FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISaga<>));
if (sagaType != null)
var sagaEntityType = sagaType.GetGenericArguments().Single();
typeof(TSaga).GetProperty("Data", BindingFlags.Instance | BindingFlags.Public)
.SetValue(saga, Activator.CreateInstance(sagaEntityType));
typeof(TSaga).GetProperty("SagaMessageFindingConfiguration", BindingFlags.Instance | BindingFlags.Public)
.SetValue(saga, new FakeSagaConfigurator());
return saga;
public void SetupForTimeout<TMessage>(
Func<IHandleTimeouts<TMessage>> handlerCreator)
where TMessage : class
new Tuple<Func<object>, Func<object, bool>>(
handlerCreator, msg => (msg as TMessage) != null));
public void SetupAnswerForRegisterCallback(Action<IEnumerable<object>, Callback> assignResultCallback)
public void Clear()
this.HandleCurrentMessageLaterWasCalled = false;
this.DoNotContinueDispatchingCurrentMessageToHandlersWasCalled = false;
this.CurrentMessageContext = new MessageContext();
public T CreateInstance<T>()
var mm = CreateMessageMapper<T>();
return mm.CreateInstance<T>();
public T CreateInstance<T>(Action<T> action)
var message = this.CreateInstance<T>();
return message;
public object CreateInstance(Type messageType)
return this.GetType().GetMethod("CreateInstance", new[] { messageType }).Invoke(this, null);
public void Publish<T>(params T[] messages)
foreach (var message in messages)
public void Publish<T>(Action<T> messageConstructor)
var message = this.CreateInstance(messageConstructor);
public void Subscribe(Type messageType)
public void Subscribe<T>()
public void Subscribe(Type messageType, Predicate<object> condition)
public void Subscribe<T>(Predicate<T> condition)
public void Unsubscribe(Type messageType)
public void Unsubscribe<T>()
public ICallback SendLocal(params object[] messages)
foreach (var message in messages)
this.DynamicInvokeHandle(message, this.sentLocalHandlers);
return new Callback(messages, this.registeredCallbacks);
public ICallback SendLocal<T>(Action<T> messageConstructor)
var message = this.CreateInstance(messageConstructor);
this.DynamicInvokeHandle(message, this.sentLocalHandlers);
return new Callback(new object[] { message }, this.registeredCallbacks);
public ICallback Send(params object[] messages)
foreach (var message in messages)
return new Callback(messages, this.registeredCallbacks);
public ICallback Send<T>(Action<T> messageConstructor)
var message = this.CreateInstance(messageConstructor);
return new Callback(new object[] { message }, this.registeredCallbacks);
public ICallback Send(string destination, params object[] messages)
return new Callback(messages, this.registeredCallbacks);
public ICallback Send(Address address, params object[] messages)
return new Callback(messages, this.registeredCallbacks);
public ICallback Send<T>(string destination, Action<T> messageConstructor)
return new Callback(new object[] { }, this.registeredCallbacks);
public ICallback Send<T>(Address address, Action<T> messageConstructor)
return new Callback(new object[] { }, this.registeredCallbacks);
public ICallback Send(string destination, string correlationId, params object[] messages)
return new Callback(new object[] { }, this.registeredCallbacks);
public ICallback Send(Address address, string correlationId, params object[] messages)
return new Callback(new object[] { }, this.registeredCallbacks);
public ICallback Send<T>(string destination, string correlationId, Action<T> messageConstructor)
return new Callback(new object[] { }, this.registeredCallbacks);
public ICallback Send<T>(Address address, string correlationId, Action<T> messageConstructor)
return new Callback(new object[] { }, this.registeredCallbacks);
public ICallback SendToSites(IEnumerable<string> siteKeys, params object[] messages)
throw new NotImplementedException();
public ICallback Defer(TimeSpan delay, params object[] messages)
this.deferredMessages.Enqueue(new DeferredMessage(messages.ToList(), delay));
foreach (var message in messages)
this.DynamicInvokeTimeout(message, this.deferHandlers);
return new Callback(messages, this.registeredCallbacks);
public ICallback Defer(DateTime processAt, params object[] messages)
this.deferredMessages.Enqueue(new DeferredMessage(messages, processAt));
foreach (var message in messages)
this.DynamicInvokeTimeout(message, this.deferHandlers);
return new Callback(messages, this.registeredCallbacks);
public void Reply(params object[] messages)
foreach (var message in messages)
public void Reply<T>(Action<T> messageConstructor)
var message = this.CreateInstance(messageConstructor);
public void Return<T>(T errorEnum)
public void HandleCurrentMessageLater()
this.HandleCurrentMessageLaterWasCalled = true;
public void ForwardCurrentMessageTo(string destination)
throw new NotImplementedException();
public void DoNotContinueDispatchingCurrentMessageToHandlers()
this.DoNotContinueDispatchingCurrentMessageToHandlersWasCalled = true;
public void Shutdown()
private static MessageMapper CreateMessageMapper<T>()
var messageMapper = new MessageMapper();
messageMapper.Initialize(new[] { typeof(T) });
return messageMapper;
private static void DoNotCallThisMethod([CallerMemberName] string memberName = "")
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "You should not use {0} in production.", memberName));
private void SetHeader(object message, string key, string value)
var keyValuePair = new KeyValuePair<string, string>(key, value);
if (this.messageHeaders.ContainsKey(message))
var headers = this.messageHeaders[message];
IDictionary<string, string> dictionary = new Dictionary<string, string>();
this.messageHeaders.Add(message, dictionary);
private void DynamicInvokeHandle(object message, IEnumerable<Tuple<Func<object>, Func<object, bool>>> handlers)
foreach (var tuple in handlers)
if (tuple.Item2(message))
var handler = tuple.Item1();
var method = handler.GetType().GetMethod("Handle", new[] { message.GetType() });
method.Invoke(handler, new[] { message });
private void DynamicInvokeTimeout(object message, IEnumerable<Tuple<Func<object>, Func<object, bool>>> handlers)
foreach (var tuple in handlers)
if (tuple.Item2(message))
var handler = tuple.Item1();
var method = handler.GetType().GetMethod("Timeout", new[] { message.GetType() });
method.Invoke(handler, new[] { message });
private class MessageContext : IMessageContext
public MessageContext()
this.Headers = new Dictionary<string, string>();
this.Id = Guid.NewGuid().ToString();
this.ReturnAddress = "localhost";
this.ReplyToAddress = Address.Parse("localhost");
this.TimeSent = DateTime.UtcNow;
public string Id { get; private set; }
public string ReturnAddress { get; private set; }
public Address ReplyToAddress { get; private set; }
public DateTime TimeSent { get; private set; }
public IDictionary<string, string> Headers { get; private set; }
/// <summary>
/// This is a very naive and first implementation of ICallback
/// </summary>
public class Callback : ICallback, IAsyncResult
private readonly IEnumerable<object> messages;
private readonly IEnumerable<Action<IEnumerable<object>, Callback>> callbacks;
public Callback(IEnumerable<object> messages, IEnumerable<Action<IEnumerable<object>, Callback>> callbacks)
this.messages = messages;
this.callbacks = callbacks;
public bool IsCompleted { get; private set; }
public WaitHandle AsyncWaitHandle { get; private set; }
public object AsyncState { get; set; }
public bool CompletedSynchronously { get; private set; }
public Task<int> Register()
throw new NotImplementedException();
public Task<T> Register<T>()
throw new NotImplementedException();
public Task<T> Register<T>(Func<CompletionResult, T> completion)
throw new NotImplementedException();
public Task Register(Action<CompletionResult> completion)
throw new NotImplementedException();
public IAsyncResult Register(AsyncCallback callback, object state)
this.AsyncState = null;
this.IsCompleted = true;
this.CompletedSynchronously = true;
this.AsyncWaitHandle = new AutoResetEvent(true);
foreach (Action<IEnumerable<object>, Callback> c in this.callbacks)
c(this.messages, this);
return this;
public void Register<T>(Action<T> callback)
public void Register<T>(Action<T> callback, object synchronizer)
public class FakeSagaConfigurator : List<Type>, IConfigureHowToFindSagaWithMessage
public void ConfigureMapping<TSagaEntity, TMessage>(Expression<Func<TSagaEntity, object>> sagaEntityProperty, Expression<Func<TMessage, object>> messageProperty) where TSagaEntity : IContainSagaData
Func<TMessage, object> func = messageProperty.Compile();
var type = func.GetType().GetGenericArguments()[0];
public class DeferredMessage : List<object>
public DeferredMessage(IEnumerable<object> message, DateTime excecutionTime)
this.ExecutionDate = excecutionTime;
public DeferredMessage(IEnumerable<object> message, TimeSpan executionTimeSpan)
this.ExecutionTimeSpan = executionTimeSpan;
public DateTime? ExecutionDate { get; set; }
public TimeSpan? ExecutionTimeSpan { get; set; }
public interface IWantBus
Bus Bus { get; set; }
public class MessageSchemaValidator
private readonly StringBuilder errorMessage = new StringBuilder();
private bool failed;
public void ValidateMessageSchema<T>(Action<T> messageInitializer, params string[] schemas)
var messageMapper = CreateMessageMapper<T>();
var message = messageMapper.CreateInstance(messageInitializer);
this.ValidateMessageSchema(message, schemas);
public void ValidateMessageSchema<T>(T message, params string[] schemas)
var messageMapper = CreateMessageMapper<T>();
var xmlSerializer = CreateXmlMessageSerializer<T>(messageMapper);
using (var memoryStream = new MemoryStream())
xmlSerializer.Serialize(new object[] { message }, memoryStream);
memoryStream.Position = 0;
var settings = new XmlReaderSettings();
settings.ValidationType = ValidationType.Schema;
settings.ValidationEventHandler += this.ValidationHandler;
foreach (var schema in schemas)
string xmlMessage = Encoding.UTF8.GetString(memoryStream.ToArray());
this.ValidateNamespace<T>(schemas, xmlMessage);
var xmlTextReader = XmlReader.Create(memoryStream, settings);
while (xmlTextReader.Read())
if (this.failed)
private void ConsoleWriteSchemas<T>(IEnumerable<string> schemas)
foreach (var schema in schemas)
private void ValidateNamespace<T>(string[] schemas, string xmlMessage)
XmlDocument document = new XmlDocument();
XmlSchema xmlSchema = this.LoadSchema(schemas[schemas.Length - 1]);
var schemaNamespace = xmlSchema.TargetNamespace;
Assert.AreEqual(schemaNamespace, document.DocumentElement.NamespaceURI, "Invalid namespace!");
private void ValidationHandler(object sender, ValidationEventArgs args)
if (args.Severity == XmlSeverityType.Error)
this.failed = true;
private XmlSchema LoadSchema(string schema)
using (var stringReader = new StringReader(schema))
return XmlSchema.Read(stringReader, this.ValidationHandler);
private static XmlMessageSerializer CreateXmlMessageSerializer<T>(MessageMapper messageMapper)
var xmlSerializer = new XmlMessageSerializer(messageMapper)
Namespace = // Your namespace,
// more config on the serializer
xmlSerializer.Initialize(new[] { typeof(T) });
return xmlSerializer;
private static MessageMapper CreateMessageMapper<T>()
var messageMapper = new MessageMapper();
messageMapper.Initialize(new[] { typeof(T) });
return messageMapper;
public static class SagaExtensions
public static void BeMappedCorrectly(this ObjectAssertions actualValue)
dynamic dynamicSaga = actualValue.Subject;
// This is an evil hack which assumes that this saga has been instantiated with the fake bus.
var mappedTypes = (IEnumerable<Type>)dynamicSaga.SagaMessageFindingConfiguration;
var handleParameterTypes =
.GetMethods(BindingFlags.Instance | BindingFlags.Public)
.Where(m => m.Name == "Handle")
.SelectMany(m => m.GetParameters())
.Select(p => p.ParameterType)
mappedTypes, "all IHandler have to be mapped");
this.bus = new Bus();
this.testee = this.bus.NewSaga(() => new YourSage(YourDependencies));
public void SagaCorrectlyMapped()
public void SomeTest
var inputMessage = this.Bus.CreateInstance<InputMessage>();
inputMessage.InputProperty = -1;
inputMessage.SomeOtherInputProperty = "SomeValue";
c =>
c.SomeProperty == -1
&& c.SomeOtherProperty == "SomeValue");
saga = NewSaga<SomeSagaWhichCanTimeout>();
bus.SetupForTimeout<TimeoutMessage>(() => saga);
sagaWhichDoesASendLocal = NewSaga<SagaWhichDoesASendLocal>();
sagaWhichGetsCreatedOnSendLocal = NewSaga<SagaWhichGetsCreatedOnSendLocal>();
bus.SetupForSendLocal<MessageWhichIsSentLocal>(() => sagaWhichGetsCreatedOnSendLocal);
// Some NUnit magic
public class WithBusAttribute : Attribute, ITestAction
// Might need to improve that with ThreadLocal for parallel execution
private static Bus bus;
public ActionTargets Targets
return ActionTargets.Test | ActionTargets.Suite;
public void BeforeTest(TestDetails testDetails)
if (testDetails.IsSuite)
bus = new Bus();
var fixture = testDetails.Fixture as IWantBus;
if (fixture == null)
fixture.Bus = bus;
public void AfterTest(TestDetails testDetails)
public static class XmlValidationExtensions
public static void BeValidMessage<T>(this ObjectAssertions assertions, params string[] schemas)
dynamic message = assertions.Subject;
var validator = new MessageSchemaValidator();
validator.ValidateMessageSchema<T>(message, schemas);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment