Created
October 2, 2020 14:54
-
-
Save mrclayman/2d95d8d6db349a1e76b0b5a694a827cb to your computer and use it in GitHub Desktop.
MassTransit saga registration using IServiceCollectionBusConfigurator vs. IRabbitMqBusFactoryConfigurator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class IRabbitMqBusFactoryConfiguratorExtensions | |
{ | |
public static void ConfigureSagaReceiveEndpoint<TSagaState, TSagaStateMachine, TRepository>( | |
this IRabbitMqBusFactoryConfigurator configurator, | |
IRabbitMqConfiguration configuration, | |
string endpointConfigurationKey) | |
where TSagaState : class, SagaStateMachineInstance | |
where TSagaStateMachine : RabbitMqSagaStateMachine<TSagaState> | |
where TRepository : ISagaRepository<TSagaState> | |
{ | |
if (configuration == null) | |
{ | |
throw new ArgumentNullException(nameof(configuration)); | |
} | |
if (string.IsNullOrEmpty(endpointConfigurationKey)) | |
{ | |
throw new ArgumentNullException(nameof(endpointConfigurationKey)); | |
} | |
IRabbitMqReceiveEndpointConfiguration endpointConfiguration = | |
configuration.GetReceiveEndpointConfiguration(endpointConfigurationKey); | |
if (string.IsNullOrEmpty(endpointConfiguration.Queue)) | |
{ | |
throw new ConfigurationErrorsException("Queue name must be defined for a saga receive endpoint"); | |
} | |
configurator.ReceiveEndpoint(endpointConfiguration.Queue, e => | |
{ | |
e.PrefetchCount = endpointConfiguration.PrefetchCount; | |
e.UseMessageRetry(x => | |
{ | |
x.Interval(endpointConfiguration.RetryCount, | |
TimeSpan.FromMilliseconds(endpointConfiguration.RetryWaitInterval)); | |
}); | |
e.StateMachineSaga( | |
(TSagaStateMachine)Activator.CreateInstance(typeof(TSagaStateMachine), configuration), | |
Activator.CreateInstance<TRepository>()); | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class IServiceCollectionBusConfiguratorExtensions | |
{ | |
public static void ConfigureRedisSagaStateMachine<TSagaState, TSagaStateMachine>( | |
this IServiceCollectionBusConfigurator configurator, | |
IRabbitMqConfiguration configuration, | |
string endpointConfigurationKey, | |
string redisConnectionString) | |
where TSagaState : class, SagaStateMachineInstance, ISagaVersion | |
where TSagaStateMachine : RabbitMqSagaStateMachine<TSagaState> | |
{ | |
IRabbitMqReceiveEndpointConfiguration endpointConfiguration = | |
GetReceiveEndpointConfiguration(configuration, endpointConfigurationKey); | |
configurator.AddSagaStateMachine<TSagaStateMachine, TSagaState>(sc => | |
{ | |
sc.UseMessageRetry(rc => | |
{ | |
rc.Interval(endpointConfiguration.RetryCount, | |
TimeSpan.FromMilliseconds(endpointConfiguration.RetryWaitInterval)); | |
}); | |
}) | |
.RedisRepository(redisConnectionString) | |
.Endpoint(c => | |
{ | |
c.PrefetchCount = endpointConfiguration.PrefetchCount; | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class RabbitMqSagaStateMachine<TInstance> : MassTransitStateMachine<TInstance> | |
where TInstance : class, SagaStateMachineInstance | |
{ | |
/// <summary> | |
/// Prefix for URI's denoting queue names. | |
/// </summary> | |
private const string QueueUriPrefix = "queue:"; | |
/// <summary> | |
/// Gets the bus configuration object. | |
/// </summary> | |
protected IRabbitMqConfiguration BusConfiguration { get; } | |
/// <summary> | |
/// Constructor. | |
/// </summary> | |
/// <param name="busConfiguration">Bus configuration object.</param> | |
protected RabbitMqSagaStateMachine(IRabbitMqConfiguration busConfiguration) | |
{ | |
BusConfiguration = busConfiguration ?? throw new ArgumentNullException(nameof(busConfiguration)); | |
} | |
/// <summary> | |
/// Returns an URI object with preconfigured | |
/// address of the given send endpoint. It is | |
/// assumed the address in the configuration | |
/// already contains the queue URI prefix. | |
/// </summary> | |
/// <param name="name">Name of the send endpoint | |
/// in the configuration.</param> | |
/// <returns>URI object with address from the | |
/// desired send endpoint configuration block.</returns> | |
protected Uri GetSendEndpointUriFromConfiguration(string name) => | |
new Uri(BusConfiguration.GetSendEndpointConfiguration(name).Address); | |
/// <summary> | |
/// Returns an URI object constructed from | |
/// the given queue name. The queue prefix is | |
/// automatically prepended to the queue name. | |
/// </summary> | |
/// <param name="queueName">Name of the queue.</param> | |
/// <returns>New URI object based on the given queue name.</returns> | |
protected Uri GetSendEndpointUriForQueueName(string queueName) => | |
new Uri($"{QueueUriPrefix}{queueName}"); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Startup | |
{ | |
private void InitializeMessageBus(IServiceCollection services) | |
{ | |
// Create service bus interface managed by the framework | |
IRabbitMqConfiguration rabbitMqConfig = RabbitMqConfiguration.Create(Configuration); | |
string redisConnectionString = ConfigurationTools.GetRedisConnectionString(Configuration); | |
// Add MassTransit to the services and configure | |
// its consumers | |
services.AddMassTransit(c => | |
{ | |
c.AddConsumers(Assembly.GetAssembly(typeof(Startup))); | |
c.UsingRabbitMq((ctx, cfg) => | |
{ | |
cfg.Host(rabbitMqConfig.Host, rabbitMqConfig.VirtualHost, h => | |
{ | |
h.Username(rabbitMqConfig.User); | |
h.Password(rabbitMqConfig.Password); | |
}); | |
// cfg.ConfigureSagaReceiveEndpoint< | |
// UploadSagaStateMachineInstance, | |
// UploadSagaStateMachine, | |
// InMemorySagaRepository<UploadSagaStateMachineInstance>>( | |
// rabbitMqConfig, UploadSagaStateMachine.EndpointConfigurationKey); | |
// cfg.ConfigureSagaReceiveEndpoint< | |
// UpdateSagaStateMachineInstance, | |
// UpdateSagaStateMachine, | |
// InMemorySagaRepository<UpdateSagaStateMachineInstance>>( | |
// rabbitMqConfig, UpdateSagaStateMachine.EndpointConfigurationKey); | |
}); | |
c.ConfigureRedisSagaStateMachine<UpdateSagaState, UpdateSagaStateMachine>( | |
rabbitMqConfig, UpdateSagaStateMachine.EndpointConfigurationKey, redisConnectionString); | |
c.ConfigureRedisSagaStateMachine<UploadSagaState, UploadSagaStateMachine>( | |
rabbitMqConfig, UploadSagaStateMachine.EndpointConfigurationKey, redisConnectionString); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment