Created
March 13, 2020 11:19
-
-
Save SzymonPobiega/57acdc53d64bbff753ca7acdadd1d5cb to your computer and use it in GitHub Desktop.
Raw message processing using plain NServiceBus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
using NServiceBus; | |
using NServiceBus.Extensibility; | |
using NServiceBus.Pipeline; | |
using NServiceBus.Routing; | |
using NServiceBus.Transport; | |
namespace TestRawProcessing | |
{ | |
public delegate Task<ErrorHandleResult> OnError(IncomingMessage messageContext, IDispatchMessages dispatcher); | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
Start().GetAwaiter().GetResult(); | |
} | |
static async Task Start() | |
{ | |
var config = new EndpointConfiguration("RawEndpoint"); | |
config.UseTransport<LearningTransport>(); | |
config.EnableInstallers(); | |
config.SendFailedMessagesTo("error"); | |
config.Pipeline.Register(b => new RawProcessingBehavior(OnMessage, OnError, b.Build<IDispatchMessages>()), "Process raw messages"); | |
config.Recoverability().CustomPolicy((recoverabilityConfig, context) => RecoverabilityAction.ImmediateRetry()); | |
var endpoint = await Endpoint.Start(config); | |
Console.WriteLine("Press enter to exit"); | |
Console.ReadLine(); | |
await endpoint.Stop(); | |
} | |
static Task<ErrorHandleResult> OnError(IncomingMessage messagecontext, IDispatchMessages dispatcher) | |
{ | |
return Task.FromResult(ErrorHandleResult.RetryRequired); | |
} | |
static Task OnMessage(IncomingMessage message, IDispatchMessages dispatcher) | |
{ | |
var outgoingMessage = new OutgoingMessage(message.MessageId, message.Headers, message.Body); | |
var transportOperations = new TransportOperation(outgoingMessage, new UnicastAddressTag("Receiver")); | |
return dispatcher.Dispatch(new TransportOperations(transportOperations), new TransportTransaction(), | |
new ContextBag()); | |
} | |
} | |
class RawProcessingBehavior : Behavior<ITransportReceiveContext> | |
{ | |
readonly Func<IncomingMessage, IDispatchMessages, Task> onMessage; | |
readonly OnError onError; | |
readonly IDispatchMessages dispatcher; | |
public RawProcessingBehavior(Func<IncomingMessage, IDispatchMessages, Task> onMessage, OnError onError, IDispatchMessages dispatcher) | |
{ | |
this.onMessage = onMessage; | |
this.onError = onError; | |
this.dispatcher = dispatcher; | |
} | |
public override async Task Invoke(ITransportReceiveContext context, Func<Task> next) | |
{ | |
try | |
{ | |
await onMessage(context.Message, dispatcher); | |
} | |
catch (Exception e) | |
{ | |
var result = await onError(context.Message, dispatcher); | |
if (result == ErrorHandleResult.RetryRequired) | |
{ | |
throw new Exception("Retry required", e); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment