Skip to content

Instantly share code, notes, and snippets.

@danielmarbach
Created July 22, 2013 17:56
Show Gist options
  • Save danielmarbach/6056018 to your computer and use it in GitHub Desktop.
Save danielmarbach/6056018 to your computer and use it in GitHub Desktop.
My InMemory approach for unit testing and acceptance testing with NServiceBus which uses custom IBus, FluentAssertions and NUnit actions
[assembly: WithBus]
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.MessageInterfaces.MessageMapper.Reflection;
using NServiceBus.Saga;
public class Bus : IBus
{
private readonly Queue<object> publishedMessages = new Queue<object>();
private readonly Queue<object> sentMessages = new Queue<object>();
private readonly Queue<object> localMessages = new Queue<object>();
private readonly Queue<object> repliedMessages = new Queue<object>();
private readonly Queue<object> returnedMessages = new Queue<object>();
private readonly Queue<DeferredMessage> deferredMessages = new Queue<DeferredMessage>();
private readonly List<Tuple<Func<object>, Func<object, bool>>> sentLocalHandlers = new List<Tuple<Func<object>, Func<object, bool>>>();
private readonly List<Tuple<Func<object>, Func<object, bool>>> deferHandlers = new List<Tuple<Func<object>, Func<object, bool>>>();
private readonly List<Action<IEnumerable<object>, Callback>> registeredCallbacks = new List<Action<IEnumerable<object>, Callback>>();
private readonly IDictionary<object, IDictionary<string, string>> messageHeaders = new Dictionary<object, IDictionary<string, string>>();
public Bus()
{
this.OutgoingHeaders = new Dictionary<string, string>();
this.CurrentMessageContext = new MessageContext();
ExtensionMethods.SetHeaderAction = this.SetHeader;
MessageConventionExtensions.IsMessageTypeAction = // your conventions here;
MessageConventionExtensions.IsCommandTypeAction = // your conventions here;
MessageConventionExtensions.IsEventTypeAction = // your conventions here;
}
public IDictionary<string, string> OutgoingHeaders { get; private set; }
public IMessageContext CurrentMessageContext
{
get; private set;
}
public IInMemoryOperations InMemory { get; private set; }
public IEnumerable<object> Published
{
get
{
return this.publishedMessages;
}
}
public IEnumerable<object> Sent
{
get
{
return this.sentMessages;
}
}
public IEnumerable<object> Local
{
get
{
return this.localMessages;
}
}
public IEnumerable<object> Replied
{
get
{
return this.repliedMessages;
}
}
public IEnumerable<object> Returned
{
get
{
return this.returnedMessages;
}
}
public IEnumerable<DeferredMessage> DeferredMessages
{
get { return this.deferredMessages; }
}
public bool HandleCurrentMessageLaterWasCalled { get; set; }
public bool DoNotContinueDispatchingCurrentMessageToHandlersWasCalled { get; set; }
public string GetHeader(object message, string key)
{
return this.messageHeaders[message][key];
}
public void SetupForSendLocal<TMessage>(
Func<IHandleMessages<TMessage>> handlerCreator)
where TMessage : class
{
this.sentLocalHandlers.Add(
new Tuple<Func<object>, Func<object, bool>>(
handlerCreator, msg => (msg as TMessage) != null));
}
public void SetupForDefer<TMessage>(
Func<IHandleMessages<TMessage>> handlerCreator)
where TMessage : class
{
this.deferHandlers.Add(
new Tuple<Func<object>, Func<object, bool>>(
handlerCreator, msg => (msg as TMessage) != null));
}
public TSaga NewSaga<TSaga>()
where TSaga : ISaga
{
return NewSaga(() => (TSaga)Activator.CreateInstance(typeof(TSaga)));
}
public TSaga NewSaga<TSaga>(Func<TSaga> creationCallback)
where TSaga : ISaga
{
var saga = creationCallback();
saga.Bus = this;
var sagaType = typeof(TSaga).GetInterfaces()
.FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISaga<>));
if (sagaType != null)
{
var sagaEntityType = sagaType.GetGenericArguments().Single();
typeof(TSaga).GetProperty("Data", BindingFlags.Instance | BindingFlags.Public)
.SetValue(saga, Activator.CreateInstance(sagaEntityType));
typeof(TSaga).GetProperty("SagaMessageFindingConfiguration", BindingFlags.Instance | BindingFlags.Public)
.SetValue(saga, new FakeSagaConfigurator());
}
((IConfigurable)saga).Configure();
return saga;
}
public void SetupForTimeout<TMessage>(
Func<IHandleTimeouts<TMessage>> handlerCreator)
where TMessage : class
{
this.deferHandlers.Add(
new Tuple<Func<object>, Func<object, bool>>(
handlerCreator, msg => (msg as TMessage) != null));
}
public void SetupAnswerForRegisterCallback(Action<IEnumerable<object>, Callback> assignResultCallback)
{
this.registeredCallbacks.Add(assignResultCallback);
}
public void Clear()
{
this.publishedMessages.Clear();
this.sentMessages.Clear();
this.localMessages.Clear();
this.repliedMessages.Clear();
this.deferredMessages.Clear();
this.returnedMessages.Clear();
this.sentLocalHandlers.Clear();
this.deferHandlers.Clear();
this.registeredCallbacks.Clear();
this.HandleCurrentMessageLaterWasCalled = false;
this.DoNotContinueDispatchingCurrentMessageToHandlersWasCalled = false;
this.OutgoingHeaders.Clear();
this.CurrentMessageContext = new MessageContext();
}
public T CreateInstance<T>()
{
var mm = CreateMessageMapper<T>();
return mm.CreateInstance<T>();
}
public T CreateInstance<T>(Action<T> action)
{
var message = this.CreateInstance<T>();
action(message);
return message;
}
public object CreateInstance(Type messageType)
{
return this.GetType().GetMethod("CreateInstance", new[] { messageType }).Invoke(this, null);
}
public void Publish<T>(params T[] messages)
{
foreach (var message in messages)
{
this.publishedMessages.Enqueue(message);
}
}
public void Publish<T>(Action<T> messageConstructor)
{
var message = this.CreateInstance(messageConstructor);
this.publishedMessages.Enqueue(message);
}
public void Subscribe(Type messageType)
{
}
public void Subscribe<T>()
{
}
public void Subscribe(Type messageType, Predicate<object> condition)
{
}
public void Subscribe<T>(Predicate<T> condition)
{
}
public void Unsubscribe(Type messageType)
{
}
public void Unsubscribe<T>()
{
}
public ICallback SendLocal(params object[] messages)
{
foreach (var message in messages)
{
this.localMessages.Enqueue(message);
this.DynamicInvokeHandle(message, this.sentLocalHandlers);
}
return new Callback(messages, this.registeredCallbacks);
}
public ICallback SendLocal<T>(Action<T> messageConstructor)
{
var message = this.CreateInstance(messageConstructor);
this.localMessages.Enqueue(message);
this.DynamicInvokeHandle(message, this.sentLocalHandlers);
return new Callback(new object[] { message }, this.registeredCallbacks);
}
public ICallback Send(params object[] messages)
{
foreach (var message in messages)
{
this.sentMessages.Enqueue(message);
}
return new Callback(messages, this.registeredCallbacks);
}
public ICallback Send<T>(Action<T> messageConstructor)
{
var message = this.CreateInstance(messageConstructor);
this.sentMessages.Enqueue(message);
return new Callback(new object[] { message }, this.registeredCallbacks);
}
public ICallback Send(string destination, params object[] messages)
{
DoNotCallThisMethod();
return new Callback(messages, this.registeredCallbacks);
}
public ICallback Send(Address address, params object[] messages)
{
DoNotCallThisMethod();
return new Callback(messages, this.registeredCallbacks);
}
public ICallback Send<T>(string destination, Action<T> messageConstructor)
{
DoNotCallThisMethod();
return new Callback(new object[] { }, this.registeredCallbacks);
}
public ICallback Send<T>(Address address, Action<T> messageConstructor)
{
DoNotCallThisMethod();
return new Callback(new object[] { }, this.registeredCallbacks);
}
public ICallback Send(string destination, string correlationId, params object[] messages)
{
DoNotCallThisMethod();
return new Callback(new object[] { }, this.registeredCallbacks);
}
public ICallback Send(Address address, string correlationId, params object[] messages)
{
DoNotCallThisMethod();
return new Callback(new object[] { }, this.registeredCallbacks);
}
public ICallback Send<T>(string destination, string correlationId, Action<T> messageConstructor)
{
DoNotCallThisMethod();
return new Callback(new object[] { }, this.registeredCallbacks);
}
public ICallback Send<T>(Address address, string correlationId, Action<T> messageConstructor)
{
DoNotCallThisMethod();
return new Callback(new object[] { }, this.registeredCallbacks);
}
public ICallback SendToSites(IEnumerable<string> siteKeys, params object[] messages)
{
throw new NotImplementedException();
}
public ICallback Defer(TimeSpan delay, params object[] messages)
{
this.deferredMessages.Enqueue(new DeferredMessage(messages.ToList(), delay));
foreach (var message in messages)
{
this.DynamicInvokeTimeout(message, this.deferHandlers);
}
return new Callback(messages, this.registeredCallbacks);
}
public ICallback Defer(DateTime processAt, params object[] messages)
{
this.deferredMessages.Enqueue(new DeferredMessage(messages, processAt));
foreach (var message in messages)
{
this.DynamicInvokeTimeout(message, this.deferHandlers);
}
return new Callback(messages, this.registeredCallbacks);
}
public void Reply(params object[] messages)
{
foreach (var message in messages)
{
this.repliedMessages.Enqueue(message);
}
}
public void Reply<T>(Action<T> messageConstructor)
{
var message = this.CreateInstance(messageConstructor);
this.repliedMessages.Enqueue(message);
}
public void Return<T>(T errorEnum)
{
this.returnedMessages.Enqueue(errorEnum);
}
public void HandleCurrentMessageLater()
{
this.HandleCurrentMessageLaterWasCalled = true;
}
public void ForwardCurrentMessageTo(string destination)
{
throw new NotImplementedException();
}
public void DoNotContinueDispatchingCurrentMessageToHandlers()
{
this.DoNotContinueDispatchingCurrentMessageToHandlersWasCalled = true;
}
public void Shutdown()
{
}
private static MessageMapper CreateMessageMapper<T>()
{
var messageMapper = new MessageMapper();
messageMapper.Initialize(new[] { typeof(T) });
return messageMapper;
}
private static void DoNotCallThisMethod([CallerMemberName] string memberName = "")
{
throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "You should not use {0} in production.", memberName));
}
private void SetHeader(object message, string key, string value)
{
var keyValuePair = new KeyValuePair<string, string>(key, value);
if (this.messageHeaders.ContainsKey(message))
{
var headers = this.messageHeaders[message];
headers.Add(keyValuePair);
}
else
{
IDictionary<string, string> dictionary = new Dictionary<string, string>();
dictionary.Add(keyValuePair);
this.messageHeaders.Add(message, dictionary);
}
}
private void DynamicInvokeHandle(object message, IEnumerable<Tuple<Func<object>, Func<object, bool>>> handlers)
{
foreach (var tuple in handlers)
{
if (tuple.Item2(message))
{
var handler = tuple.Item1();
var method = handler.GetType().GetMethod("Handle", new[] { message.GetType() });
method.Invoke(handler, new[] { message });
}
}
}
private void DynamicInvokeTimeout(object message, IEnumerable<Tuple<Func<object>, Func<object, bool>>> handlers)
{
foreach (var tuple in handlers)
{
if (tuple.Item2(message))
{
var handler = tuple.Item1();
var method = handler.GetType().GetMethod("Timeout", new[] { message.GetType() });
method.Invoke(handler, new[] { message });
}
}
}
private class MessageContext : IMessageContext
{
public MessageContext()
{
this.Headers = new Dictionary<string, string>();
this.Id = Guid.NewGuid().ToString();
this.ReturnAddress = "localhost";
this.ReplyToAddress = Address.Parse("localhost");
this.TimeSent = DateTime.UtcNow;
}
public string Id { get; private set; }
public string ReturnAddress { get; private set; }
public Address ReplyToAddress { get; private set; }
public DateTime TimeSent { get; private set; }
public IDictionary<string, string> Headers { get; private set; }
}
}
/// <summary>
/// This is a very naive and first implementation of ICallback
/// </summary>
public class Callback : ICallback, IAsyncResult
{
private readonly IEnumerable<object> messages;
private readonly IEnumerable<Action<IEnumerable<object>, Callback>> callbacks;
public Callback(IEnumerable<object> messages, IEnumerable<Action<IEnumerable<object>, Callback>> callbacks)
{
this.messages = messages;
this.callbacks = callbacks;
}
public bool IsCompleted { get; private set; }
public WaitHandle AsyncWaitHandle { get; private set; }
public object AsyncState { get; set; }
public bool CompletedSynchronously { get; private set; }
public Task<int> Register()
{
throw new NotImplementedException();
}
public Task<T> Register<T>()
{
throw new NotImplementedException();
}
public Task<T> Register<T>(Func<CompletionResult, T> completion)
{
throw new NotImplementedException();
}
public Task Register(Action<CompletionResult> completion)
{
throw new NotImplementedException();
}
public IAsyncResult Register(AsyncCallback callback, object state)
{
this.AsyncState = null;
this.IsCompleted = true;
this.CompletedSynchronously = true;
this.AsyncWaitHandle = new AutoResetEvent(true);
foreach (Action<IEnumerable<object>, Callback> c in this.callbacks)
{
c(this.messages, this);
}
callback(this);
return this;
}
public void Register<T>(Action<T> callback)
{
callback(default(T));
}
public void Register<T>(Action<T> callback, object synchronizer)
{
callback(default(T));
}
}
public class FakeSagaConfigurator : List<Type>, IConfigureHowToFindSagaWithMessage
{
public void ConfigureMapping<TSagaEntity, TMessage>(Expression<Func<TSagaEntity, object>> sagaEntityProperty, Expression<Func<TMessage, object>> messageProperty) where TSagaEntity : IContainSagaData
{
Func<TMessage, object> func = messageProperty.Compile();
var type = func.GetType().GetGenericArguments()[0];
this.Add(type);
}
}
public class DeferredMessage : List<object>
{
public DeferredMessage(IEnumerable<object> message, DateTime excecutionTime)
{
this.AddRange(message);
this.ExecutionDate = excecutionTime;
}
public DeferredMessage(IEnumerable<object> message, TimeSpan executionTimeSpan)
{
this.AddRange(message);
this.ExecutionTimeSpan = executionTimeSpan;
}
public DateTime? ExecutionDate { get; set; }
public TimeSpan? ExecutionTimeSpan { get; set; }
}
[WithBus]
public interface IWantBus
{
Bus Bus { get; set; }
}
public class MessageSchemaValidator
{
private readonly StringBuilder errorMessage = new StringBuilder();
private bool failed;
public void ValidateMessageSchema<T>(Action<T> messageInitializer, params string[] schemas)
{
var messageMapper = CreateMessageMapper<T>();
var message = messageMapper.CreateInstance(messageInitializer);
this.ValidateMessageSchema(message, schemas);
}
public void ValidateMessageSchema<T>(T message, params string[] schemas)
{
var messageMapper = CreateMessageMapper<T>();
var xmlSerializer = CreateXmlMessageSerializer<T>(messageMapper);
using (var memoryStream = new MemoryStream())
{
xmlSerializer.Serialize(new object[] { message }, memoryStream);
memoryStream.Position = 0;
var settings = new XmlReaderSettings();
settings.ValidationType = ValidationType.Schema;
settings.ValidationEventHandler += this.ValidationHandler;
foreach (var schema in schemas)
{
settings.Schemas.Add(this.LoadSchema(schema));
}
string xmlMessage = Encoding.UTF8.GetString(memoryStream.ToArray());
this.ValidateNamespace<T>(schemas, xmlMessage);
var xmlTextReader = XmlReader.Create(memoryStream, settings);
while (xmlTextReader.Read())
{
}
if (this.failed)
{
this.ConsoleWriteSchemas<T>(schemas);
Console.WriteLine(xmlMessage);
Assert.Fail(this.errorMessage.ToString());
}
}
}
private void ConsoleWriteSchemas<T>(IEnumerable<string> schemas)
{
foreach (var schema in schemas)
{
Console.WriteLine(schema);
}
}
private void ValidateNamespace<T>(string[] schemas, string xmlMessage)
{
XmlDocument document = new XmlDocument();
document.LoadXml(xmlMessage);
XmlSchema xmlSchema = this.LoadSchema(schemas[schemas.Length - 1]);
var schemaNamespace = xmlSchema.TargetNamespace;
Assert.AreEqual(schemaNamespace, document.DocumentElement.NamespaceURI, "Invalid namespace!");
}
private void ValidationHandler(object sender, ValidationEventArgs args)
{
if (args.Severity == XmlSeverityType.Error)
{
this.failed = true;
this.errorMessage.AppendLine(args.Message);
}
}
private XmlSchema LoadSchema(string schema)
{
using (var stringReader = new StringReader(schema))
{
return XmlSchema.Read(stringReader, this.ValidationHandler);
}
}
private static XmlMessageSerializer CreateXmlMessageSerializer<T>(MessageMapper messageMapper)
{
var xmlSerializer = new XmlMessageSerializer(messageMapper)
{
Namespace = // Your namespace,
// more config on the serializer
};
xmlSerializer.Initialize(new[] { typeof(T) });
return xmlSerializer;
}
private static MessageMapper CreateMessageMapper<T>()
{
var messageMapper = new MessageMapper();
messageMapper.Initialize(new[] { typeof(T) });
return messageMapper;
}
}
public static class SagaExtensions
{
public static void BeMappedCorrectly(this ObjectAssertions actualValue)
{
actualValue.Subject.Should().BeAssignableTo<ISaga>();
dynamic dynamicSaga = actualValue.Subject;
// This is an evil hack which assumes that this saga has been instantiated with the fake bus.
var mappedTypes = (IEnumerable<Type>)dynamicSaga.SagaMessageFindingConfiguration;
var handleParameterTypes =
actualValue.Subject.GetType()
.GetMethods(BindingFlags.Instance | BindingFlags.Public)
.Where(m => m.Name == "Handle")
.SelectMany(m => m.GetParameters())
.Select(p => p.ParameterType)
.Distinct();
handleParameterTypes.Should().BeEquivalentTo(
mappedTypes, "all IHandler have to be mapped");
}
}
this.bus = new Bus();
this.testee = this.bus.NewSaga(() => new YourSage(YourDependencies));
[Test]
public void SagaCorrectlyMapped()
{
this.testee.Should().BeMappedCorrectly();
}
[Test]
public void SomeTest
{
var inputMessage = this.Bus.CreateInstance<InputMessage>();
inputMessage.InputProperty = -1;
inputMessage.SomeOtherInputProperty = "SomeValue";
this.testee.Handle(inputMessage);
this.bus.Local.OfType<YourMessage>().Should().ContainSingle(
c =>
c.SomeProperty == -1
&& c.SomeOtherProperty == "SomeValue");
}
saga = NewSaga<SomeSagaWhichCanTimeout>();
bus.SetupForTimeout<TimeoutMessage>(() => saga);
sagaWhichDoesASendLocal = NewSaga<SagaWhichDoesASendLocal>();
sagaWhichGetsCreatedOnSendLocal = NewSaga<SagaWhichGetsCreatedOnSendLocal>();
bus.SetupForSendLocal<MessageWhichIsSentLocal>(() => sagaWhichGetsCreatedOnSendLocal);
// Some NUnit magic
public class WithBusAttribute : Attribute, ITestAction
{
// Might need to improve that with ThreadLocal for parallel execution
private static Bus bus;
public ActionTargets Targets
{
get
{
return ActionTargets.Test | ActionTargets.Suite;
}
}
public void BeforeTest(TestDetails testDetails)
{
if (testDetails.IsSuite)
{
bus = new Bus();
}
var fixture = testDetails.Fixture as IWantBus;
if (fixture == null)
{
return;
}
fixture.Bus = bus;
}
public void AfterTest(TestDetails testDetails)
{
bus.Clear();
}
}
public static class XmlValidationExtensions
{
public static void BeValidMessage<T>(this ObjectAssertions assertions, params string[] schemas)
{
dynamic message = assertions.Subject;
var validator = new MessageSchemaValidator();
validator.ValidateMessageSchema<T>(message, schemas);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment