Last active
October 20, 2016 10:01
-
-
Save timReynolds/e5d214ba6c0550a4a0c8470e8aa96008 to your computer and use it in GitHub Desktop.
NServiceBus Notification FailedMessageResponder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public const string FailedMessageResponderNotResponding = "No failed message response defined for {0}"; | |
public const string FailedMessageResponderResponding = "Creating {0} for failed message {1}"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Text; | |
using ASI.Logging.Contracts; | |
using ASI.Message; | |
using Newtonsoft.Json; | |
using NServiceBus; | |
using NServiceBus.Faults; | |
using PF.Message.Requests.FinancedTransactions; | |
using PF.Message.Responses.FinancedTransactions; | |
using PFBOI.Saga.Host.Constants; | |
using JsonSerializer = Newtonsoft.Json.JsonSerializer; | |
namespace PFBOI.Saga.Host.Functions | |
{ | |
public class FailedMessageResponder : IFailedMessageResponder | |
{ | |
private readonly IAsiLogger asiLogger; | |
private readonly IEndpointInstance endpointInstance; | |
private readonly Dictionary<string, Type> responseToMessages; | |
public FailedMessageResponder(IAsiLogger asiLogger, IEndpointInstance endpointInstance) | |
{ | |
this.asiLogger = asiLogger; | |
this.endpointInstance = endpointInstance; | |
responseToMessages = new Dictionary<string, Type> | |
{ | |
{typeof(FinancedTransactionsRequest).AssemblyQualifiedName, typeof(FinancedTransactionsFailedResponse)} | |
}; | |
} | |
public void HandleFailedMessage(object sender, FailedMessage failedMessage) | |
{ | |
var messageType = failedMessage.Headers[Headers.EnclosedMessageTypes]; | |
Type responseEventType; | |
if (!responseToMessages.TryGetValue(messageType, out responseEventType)) | |
{ | |
asiLogger.DebugFormat(this, HostConstants.FailedMessageResponderNotResponding, messageType); | |
return; | |
} | |
asiLogger.DebugFormat(this, HostConstants.FailedMessageResponderResponding, responseEventType, messageType); | |
var failedMessageBody = (IAsiMessage) GetMessageFromBody(failedMessage.Body, messageType); | |
var originatingEndpoint = failedMessage.Headers[Headers.OriginatingEndpoint]; | |
var originatingMachine = failedMessage.Headers[Headers.OriginatingMachine]; | |
var failedMessageEvent = (IAsiMessage) Activator.CreateInstance(responseEventType); | |
failedMessageEvent.CorrelationId = failedMessageBody.CorrelationId; | |
endpointInstance.Send($"{originatingEndpoint}@{originatingMachine}", failedMessageEvent); | |
} | |
private object GetMessageFromBody(byte[] messageBody, string messageTypeString) | |
{ | |
using (var stream = new MemoryStream(messageBody)) | |
{ | |
var reader = CreateJsonReader(stream); | |
var jsonSerializer = JsonSerializer.Create(); | |
return jsonSerializer.Deserialize(reader, Type.GetType(messageTypeString)); | |
} | |
} | |
private JsonReader CreateJsonReader(Stream stream) | |
{ | |
var streamReader = new StreamReader(stream, Encoding.UTF8); | |
return new JsonTextReader(streamReader); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Text; | |
using ASI.Logging.Contracts; | |
using Moq; | |
using Newtonsoft.Json; | |
using NServiceBus.Faults; | |
using NServiceBus.Testing; | |
using NUnit.Framework; | |
using PF.Message.Requests.FinancedTransactions; | |
using PF.Message.Responses.FinancedTransactions; | |
using PFBOI.Saga.Host.Constants; | |
using PFBOI.Saga.Host.Functions; | |
namespace PFBOI.Saga.Host.UnitTests.Functions | |
{ | |
[TestFixture] | |
public class FailedMessageResponderTests | |
{ | |
private Mock<IAsiLogger> asiLogger; | |
private TestableEndpointInstance endpointInstance; | |
private FailedMessageResponder sut; | |
[SetUp] | |
public void SetUp() | |
{ | |
asiLogger = new Mock<IAsiLogger>(); | |
endpointInstance = new TestableEndpointInstance(); | |
sut = new FailedMessageResponder(asiLogger.Object, endpointInstance); | |
} | |
[Test] | |
public void HandleFailedMessage_WhenGivenAMessageTypeThatIsNotRespondedToOnError_ShouldNotRespondToError() | |
{ | |
// Arrange | |
var messageType = "NotAMessageType"; | |
var failedMessage = CreateFailedMessage(new byte[] { }, messageType); | |
// Act | |
sut.HandleFailedMessage(new object(), failedMessage); | |
// Assert | |
asiLogger.Verify(a => a.DebugFormat(sut, HostConstants.FailedMessageResponderNotResponding, messageType), Times.Once); | |
} | |
[Test] | |
public void HandleFailedMessage_WhenGivenAMessageTypeThatIsRespondedToOnError_ShouldRespondToError() | |
{ | |
// Arrange | |
var messageType = typeof(FinancedTransactionsRequest).AssemblyQualifiedName; | |
var originatingEndpoint = "originatingEndpoint"; | |
var originatingMachine = "originatingMachine"; | |
var failedMessageBody = new FinancedTransactionsRequest | |
{ | |
CorrelationId = Guid.NewGuid() | |
}; | |
var failedMessage = CreateFailedMessage(SerializeToByteArray(failedMessageBody), messageType, originatingEndpoint, originatingMachine); | |
// Act | |
sut.HandleFailedMessage(new object(), failedMessage); | |
// Assert | |
asiLogger.Verify(a => a.DebugFormat(sut, HostConstants.FailedMessageResponderResponding, typeof(FinancedTransactionsFailedResponse), messageType), Times.Once); | |
Assert.AreEqual(1, endpointInstance.SentMessages.Length); | |
var sentMessage = endpointInstance.SentMessages.First(); | |
Assert.IsInstanceOf<FinancedTransactionsFailedResponse>(sentMessage.Message); | |
var message = (FinancedTransactionsFailedResponse) sentMessage.Message; | |
Assert.AreEqual(failedMessageBody.CorrelationId, message.CorrelationId); | |
} | |
private byte[] SerializeToByteArray(object message) | |
{ | |
using (var stream = new MemoryStream()) | |
{ | |
var jsonWriter = CreateJsonReader(stream); | |
JsonSerializer.Create().Serialize(jsonWriter, message); | |
jsonWriter.Flush(); | |
return stream.ToArray(); | |
} | |
} | |
private JsonWriter CreateJsonReader(Stream stream) | |
{ | |
var streamWriter = new StreamWriter(stream, Encoding.UTF8); | |
return new JsonTextWriter(streamWriter); | |
} | |
private FailedMessage CreateFailedMessage(byte[] body, string enclosedMessageTypes = "", string originatingEndpoint = "", string originatingMachine = "", string exceptionMessage = "") | |
{ | |
var headers = new Dictionary<string, string> | |
{ | |
{"NServiceBus.EnclosedMessageTypes", enclosedMessageTypes}, | |
{"NServiceBus.OriginatingEndpoint", originatingEndpoint}, | |
{"NServiceBus.OriginatingMachine", originatingMachine}, | |
}; | |
return new FailedMessage(Guid.NewGuid().ToString(), headers, body, new Exception(exceptionMessage), ""); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void SubscribeToErrorQueueNotifications(Notifications notifications) | |
{ | |
var errors = notifications.Errors; | |
var errorMessageResponder = Container.Resolve<IFailedMessageResponder>(); | |
errors.MessageSentToErrorQueue += LogMessageSentToErrorQueue; | |
errors.MessageSentToErrorQueue += errorMessageResponder.HandleFailedMessage; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You'll want to publish the
failedMessageEvent
instead of sending it, that would also mean the code to get the originating endpoint and machine name could be removed.