Skip to content

Instantly share code, notes, and snippets.

@SzymonPobiega
Created March 10, 2020 07:26
Show Gist options
  • Save SzymonPobiega/d108a93c980bcd1863f421219e1b6611 to your computer and use it in GitHub Desktop.
Save SzymonPobiega/d108a93c980bcd1863f421219e1b6611 to your computer and use it in GitHub Desktop.
Hacks in NServiceBus.Raw

Stuff passed via settings

Things that transport used to require that are passed via settings, not via seam:

Settings.Set("Endpoint.SendOnly", sendOnly);
Settings.Set("TypesToScan", new Type[0]);
Settings.Set("NServiceBus.Routing.EndpointName", endpointName);
Settings.Set<Conventions>(new Conventions()); //Hack for ASB
Settings.Set<StartupDiagnosticEntries>(new StartupDiagnosticEntries());
Settings.Set<QueueBindings>(new QueueBindings());
Settings.SetDefault("Transactions.IsolationLevel", IsolationLevel.ReadCommitted);
Settings.SetDefault("Transactions.DefaultTimeout", TransactionManager.DefaultTimeout);
Settings.Set("errorQueue", poisonMessageQueue); //Hack for MSMQ
Settings.Set<TransportInfrastructure>(transportInfrastructure);

Connection string

string GetConnectionString(TransportDefinition transportDefinition)
{
    var instance = settings.Get(connectionStringType.FullName);
    return (string) connectionStringGetter.Invoke(instance, new object[] {transportDefinition});
}

CriticalError

RawCriticalError overrides the Raise method and has most of the code copied. It has to override it because the original one cannot be used as it checks if the Endpoint field is not null and if so, does nothing

ReceivingComponent

The receiving component constructor is private but the instance of that component is used to obtain the LocalAddress in the transports

static void RegisterReceivingComponent(SettingsHolder settings, LogicalAddress logicalAddress, string localAddress)
{
    const BindingFlags flags = BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.CreateInstance;
    var parameters = new[]
    {
        typeof(LogicalAddress),
        typeof(string),
        typeof(string),
        typeof(string),
        typeof(TransportTransactionMode),
        typeof(PushRuntimeSettings),
        typeof(bool)
    };
    var ctor = typeof(Endpoint).Assembly.GetType("NServiceBus.ReceiveConfiguration", true).GetConstructor(flags, null, parameters, null);

    var receiveConfig = ctor.Invoke(new object[] { logicalAddress, localAddress, localAddress, null, null, null, false });
    settings.Set("NServiceBus.ReceiveConfiguration", receiveConfig);
}

SubscriptionManager

The factory for SubscriptionManager is not public

static IManageSubscriptions CreateSubscriptionManager(TransportInfrastructure transportInfra)
{
    var subscriptionInfra = transportInfra.ConfigureSubscriptionInfrastructure();
    var factoryProperty = typeof(TransportSubscriptionInfrastructure).GetProperty("SubscriptionManagerFactory", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
    var factoryInstance = (Func<IManageSubscriptions>)factoryProperty.GetValue(subscriptionInfra, new object[0]);
    return factoryInstance();
}

Start vs start receiving

Not sure if that is still the case but the NServiceBus API did not allow to decouple in time starting the endpoint and starting of the receive process. This decoupling is really nice if you want to connect two transports (as in Router or TransportAdapter) as you want to ensure you can send messages to both sides before you start receiving from either. You don't want to get into situation when you started receiving from one side but you can't yet send to the other.

Async-enabled error handling policy

Making the error handling policy return Task allows for interesting scenarios like putting the message at the end of the input queue or storing it in a database of some kind.

Task<ErrorHandleResult> OnError(IErrorHandlingPolicyContext handlingContext, IDispatchMessages dispatcher);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment