Created
February 18, 2016 04:09
-
-
Save djmnz/acc2f484187d59d81d97 to your computer and use it in GitHub Desktop.
MassTransit + AzureServiceBus + AutoFac
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Configuration; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Autofac; | |
using MassTransit; | |
using MassTransit.AutofacIntegration; | |
using MassTransit.AzureServiceBusTransport; | |
using Microsoft.ServiceBus; | |
using StarNow.Events.Core.Configuration; | |
namespace StarNow.Core.Tests.Events | |
{ | |
public class ServiceBusIntegrationTest | |
{ | |
public static void Main() | |
{ | |
var tasks = new List<Task>(); | |
//tasks.Add(Publisher(CreateServiceBusReceiver(false))); | |
tasks.Add(Task.Factory.StartNew(() => Receiver(CreateAutoFacReceiver()))); | |
tasks.Add(Task.Factory.StartNew(() => Publisher(CreateAutoFacPublisher()))); | |
//tasks.Add(Receiver(CreateServiceBusReceiver(true))); | |
//tasks.Add(Receiver(CreateAutoFacReceiverManual())); | |
Task.WaitAll(tasks.ToArray()); | |
} | |
public async static Task Publisher(IBusControl bus) | |
{ | |
using (var handle = bus.Start()) | |
{ | |
int i = 0; | |
while (i < 10) | |
{ | |
i++; | |
if (i%3 == 0) | |
{ | |
await bus.Publish<SomethingElseHappened>(new SomethingElseHappenedEvent() | |
{ | |
Claro = new Built() { Message = "hello" }, | |
Blabla = "something else app" + i, | |
When = DateTime.UtcNow | |
}); | |
Debug.WriteLine("Published Else " + i); | |
} | |
else | |
{ | |
await bus.Publish<SomethingHappened>(new SomethingHappenedEvent() | |
{ | |
Test = "console app" + i, | |
When = DateTime.UtcNow | |
}); | |
Debug.WriteLine("Published Normal " + i); | |
} | |
Thread.Sleep(1000); | |
} | |
handle.Stop(); | |
} | |
} | |
public async static Task Receiver(IBusControl bus) | |
{ | |
using (var handle = bus.Start()) | |
{ | |
int i = 0; | |
while (i < 10) | |
{ | |
i++; | |
Debug.WriteLine("OK" + i); | |
Thread.Sleep(1000); | |
} | |
handle.Stop(); | |
} | |
} | |
private static IBusControl CreateAutoFacPublisher() | |
{ | |
var builder = new ContainerBuilder(); | |
builder.RegisterModule(new BusPublisherModule()); | |
var container = builder.Build(); | |
return container.Resolve<IBusControl>(); | |
} | |
private static IBusControl CreateAutoFacReceiver() | |
{ | |
var builder = new ContainerBuilder(); | |
builder.RegisterType<SomeDependency>() | |
.InstancePerLifetimeScope() | |
.AsSelf(); | |
//builder.RegisterModule(new BusConsumerModule(new IBusConsumerRegistration[] | |
//{ | |
// new BusConsumerRegistration<SomethingHappened,TestMessageConsumer>("something_happened"), | |
// new BusConsumerRegistration<SomethingElseHappened,ElseMessageConsumer>("something_else_happened"), | |
// new BusConsumerRegistration<SomethingElseHappened,SpecialElseMessageConsumer>("something_else_happened_otherqueue") | |
//}). | |
builder.RegisterModule( | |
new BusConsumerModule() | |
.AddConsumer<SomethingHappened, TestMessageConsumer>("something_happened") | |
.AddConsumer<SomethingElseHappened, ElseMessageConsumer>("something_else_happened") | |
.AddConsumer<SomethingElseHappened, SpecialElseMessageConsumer>("something_else_happened_otherqueue") | |
); | |
var container = builder.Build(); | |
return container.Resolve<IBusControl>(); | |
} | |
private static IBusControl CreateAutoFacReceiverManual() | |
{ | |
var builder = new ContainerBuilder(); | |
builder.RegisterType<TestMessageConsumer>(); | |
builder.RegisterGeneric(typeof(AutofacConsumerFactory<>)) | |
.WithParameter(new NamedParameter("name", "message")) | |
.As(typeof(IConsumerFactory<>)); | |
builder.Register(context => | |
{ | |
var busControl = Bus.Factory.CreateUsingAzureServiceBus(sbc => | |
{ | |
var serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", | |
"SOMETHING-ns", | |
"WebEvents"); | |
var host = sbc.Host(serviceUri, h => | |
{ | |
h.OperationTimeout = TimeSpan.FromSeconds(5); | |
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider( | |
"RootManageSharedAccessKey", | |
"SOMETHING"); | |
}); | |
sbc.UseJsonSerializer(); | |
sbc.ReceiveEndpoint(host, "something_happened", ep => | |
{ | |
ep.Consumer<TestMessageConsumer>(context.Resolve<IConsumerFactory<TestMessageConsumer>>()); | |
}); | |
}); | |
return busControl; | |
}) | |
.SingleInstance() | |
.As<IBusControl>() | |
.As<IBus>(); | |
var container = builder.Build(); | |
return container.Resolve<IBusControl>(); | |
} | |
} | |
public interface Build | |
{ | |
string Message { get; set; } | |
} | |
public class Built : Build | |
{ | |
public string Message { get; set; } | |
} | |
public class Logge1r | |
{ | |
} | |
public class TestMessageConsumer : IConsumer<SomethingHappened> | |
{ | |
private readonly SomeDependency _dependency; | |
public TestMessageConsumer(SomeDependency dependency) | |
{ | |
_dependency = dependency; | |
} | |
public async Task Consume(ConsumeContext<SomethingHappened> context) | |
{ | |
Debug.WriteLine("SHIT MSG = " + context.Message.Test); | |
} | |
} | |
public class ElseMessageConsumer : IConsumer<SomethingElseHappened> | |
{ | |
public async Task Consume(ConsumeContext<SomethingElseHappened> context) | |
{ | |
Debug.WriteLine("ELSE MSG CLARO = " + context.Message.Claro.Message); | |
Debug.WriteLine("ELSE MSG = " + context.Message.Blabla); | |
} | |
} | |
public class SpecialElseMessageConsumer : IConsumer<SomethingElseHappened> | |
{ | |
public async Task Consume(ConsumeContext<SomethingElseHappened> context) | |
{ | |
Debug.WriteLine("SPECIAL ELSE MSG CLARO = " + context.Message.Claro.Message); | |
Debug.WriteLine("SPECIAL ELSE MSG = " + context.Message.Blabla); | |
} | |
} | |
public interface SomethingElseHappened | |
{ | |
DateTime When { get; set; } | |
string Blabla { get; set; } | |
Build Claro { get; set; } | |
} | |
public interface SomethingHappened | |
{ | |
DateTime When { get; set; } | |
string Test { get; set; } | |
} | |
public class SomethingHappenedEvent : SomethingHappened | |
{ | |
public SomethingHappenedEvent() | |
{ | |
} | |
public DateTime When { get; set; } | |
public string Test { get; set; } | |
} | |
public class SomethingElseHappenedEvent : SomethingElseHappened | |
{ | |
public DateTime When { get; set; } | |
public string Blabla { get; set; } | |
public Build Claro { get; set; } | |
} | |
public class SomeDependency : IDisposable | |
{ | |
private readonly int _instance = Total++; | |
private static int Total = 0; | |
public SomeDependency() | |
{ | |
Debug.WriteLine($"Created {_instance}"); | |
} | |
public void Dispose() | |
{ | |
Debug.WriteLine($"DISPOSED {_instance}"); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using Autofac; | |
using MassTransit; | |
using MassTransit.AutofacIntegration; | |
using MassTransit.AzureServiceBusTransport; | |
using Microsoft.ServiceBus; | |
namespace StarNow.Events.Core.Configuration | |
{ | |
public class BusConsumerModule : BusModule | |
{ | |
private readonly List<IBusConsumerRegistration> _consumers; | |
public BusConsumerModule() | |
{ | |
_consumers = new List<IBusConsumerRegistration>(); | |
} | |
public BusConsumerModule(IEnumerable<IBusConsumerRegistration> consumers) | |
{ | |
this._consumers = new List<IBusConsumerRegistration>(consumers); | |
} | |
protected override void Load(ContainerBuilder builder) | |
{ | |
foreach (var consumer in _consumers) | |
{ | |
consumer.RegisterService(builder); | |
} | |
builder.RegisterGeneric(typeof(AutofacConsumerFactory<>)) | |
.WithParameter(new NamedParameter("name", "message")) | |
.As(typeof(IConsumerFactory<>)); | |
builder.Register(context => | |
{ | |
var busControl = Bus.Factory.CreateUsingAzureServiceBus(sbc => | |
{ | |
var serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", | |
"SOMETHING-ns", | |
"WebEvents"); | |
var host = sbc.Host(serviceUri, h => | |
{ | |
h.OperationTimeout = TimeSpan.FromSeconds(5); | |
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider( | |
"RootManageSharedAccessKey", | |
"SOMETHING"); | |
}); | |
sbc.UseJsonSerializer(); | |
foreach (var register in _consumers) | |
{ | |
sbc.ReceiveEndpoint(host, register.QueueName, ep => | |
{ | |
register.SetupEndpoint(ep, context); | |
}); | |
} | |
}); | |
return busControl; | |
}) | |
.SingleInstance() | |
.As<IBusControl>() | |
.As<IBus>(); | |
base.Load(builder); | |
} | |
public BusConsumerModule AddConsumer<TMessage, TConsumer>(string queueName, Action<BusConsumerRegistration<TMessage, TConsumer>> registrationConfiguration = null) | |
where TMessage : class | |
where TConsumer : class, IConsumer<TMessage> | |
{ | |
var busConsumerRegistration = new BusConsumerRegistration<TMessage, TConsumer>(queueName); | |
if (registrationConfiguration != null) | |
{ | |
registrationConfiguration(busConsumerRegistration); | |
} | |
_consumers.Add(busConsumerRegistration); | |
return this; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Autofac; | |
using Autofac.Builder; | |
using MassTransit; | |
using MassTransit.AzureServiceBusTransport; | |
namespace StarNow.Events.Core.Configuration | |
{ | |
public interface IBusConsumerRegistration | |
{ | |
string QueueName { get; set; } | |
void RegisterService(ContainerBuilder builder); | |
void SetupEndpoint(IServiceBusReceiveEndpointConfigurator ep, IComponentContext context); | |
} | |
public class BusConsumerRegistration<TMessage, TConsumer> : IBusConsumerRegistration | |
where TMessage : class | |
where TConsumer : class, IConsumer<TMessage> | |
{ | |
public BusConsumerRegistration(string queueName) | |
{ | |
QueueName = queueName; | |
} | |
public string QueueName { get; set; } | |
public Action<IRegistrationBuilder<TConsumer, ConcreteReflectionActivatorData, SingleRegistrationStyle>> ContainerExtensions | |
{ get; set; } | |
public void RegisterService(ContainerBuilder builder) | |
{ | |
var registrationBuilder = builder.RegisterType<TConsumer>().AsImplementedInterfaces().AsSelf(); | |
if (ContainerExtensions != null) ContainerExtensions(registrationBuilder); | |
} | |
public void SetupEndpoint(IServiceBusReceiveEndpointConfigurator ep, IComponentContext context) | |
{ | |
ep.Consumer<TConsumer>(context.Resolve<IConsumerFactory<TConsumer>>()); | |
} | |
public BusConsumerRegistration<TMessage, TConsumer> ConfigureContainer(Action<IRegistrationBuilder<TConsumer, ConcreteReflectionActivatorData, SingleRegistrationStyle>> configure) | |
{ | |
ContainerExtensions = configure; | |
return this; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment