Created
September 11, 2013 08:05
-
-
Save danbarua/6520603 to your computer and use it in GitHub Desktop.
Custom version of EasyNetQ's AutoSubscriber
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Foo.Common.Messaging.EasyNetQ | |
{ | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
using System.Security.Cryptography; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Castle.Windsor; | |
using global::EasyNetQ; | |
using global::EasyNetQ.Topology; | |
using Foo.Common.Interfaces; | |
using ServiceStack.Logging; | |
/// <summary>Hooks up Domain Message Handlers in a <see cref="IWindsorContainer"/> with EasyNetQ subscriptions on a <see cref="IBus"/>.</summary> | |
public class EasyNetQAdvancedAutoSubscriber | |
{ | |
/// <summary>The log.</summary> | |
private static readonly ILog Log = LogManager.GetLogger(typeof(EasyNetQAdvancedAutoSubscriber)); | |
/// <summary>The bus.</summary> | |
private readonly IBus bus; | |
/// <summary> | |
/// The container. | |
/// </summary> | |
private readonly IWindsorContainer container; | |
/// <summary>Initialises a new instance of the <see cref="EasyNetQAdvancedAutoSubscriber"/> class.</summary> | |
/// <param name="bus">The bus.</param> | |
/// <param name="container">The container.</param> | |
/// <exception cref="ArgumentNullException">Thrown if <paramref name="bus"/> or <paramref name="container"/> is null.</exception> | |
public EasyNetQAdvancedAutoSubscriber(IBus bus, IWindsorContainer container) | |
{ | |
if (bus == null) throw new ArgumentNullException("bus"); | |
if (container == null) throw new ArgumentNullException("container"); | |
this.bus = bus; | |
this.container = container; | |
this.GenerateSubscriptionId = this.DefaultSubscriptionIdGenerator; | |
this.MessageDispatcher = new EasyNetQWindsorMessageDispatcher(container); | |
} | |
/// <summary> | |
/// Gets or sets a function responsible for generating SubscriptionIds, when you use | |
/// <see cref="IConsume{T}" />, since it does not let you specify | |
/// specific SubscriptionIds. | |
/// Message type and SubscriptionId is the key; which if two | |
/// equal keys exists, you will get round robin consumption of | |
/// messages. | |
/// </summary> | |
public Func<ConsumerInfo, string> GenerateSubscriptionId { protected get; set; } | |
/// <summary> | |
/// Gets or sets an <see cref="IMessageDispatcher"/> dispatcher responsible for consuming a message with the relevant message consumer. | |
/// </summary> | |
public IMessageDispatcher MessageDispatcher { get; set; } | |
/// <summary> | |
/// Scans the <see cref="IWindsorContainer"/> supplied in the constructor to hook up | |
/// any instances of ICommandHandler and IEventHandler with a subscription on RabbitMQ. | |
/// </summary> | |
public void SubscribeMessageHandlers() | |
{ | |
var advancedSubscribeMethod = this.GetSubscribeMethodOfAdvancedBus(); | |
var candidateMessageHandlerTypes = | |
this.container.Kernel.GetAssignableHandlers(typeof(object)).Select(x => x.ComponentModel.Implementation); | |
var subscriptionInfos = this.GetSubscriptionInfos(candidateMessageHandlerTypes); | |
try | |
{ | |
foreach (var kv in subscriptionInfos) | |
{ | |
foreach (var subscriptionInfo in kv.Value) | |
{ | |
var wrappedMessageParameterType = | |
typeof(IMessage<>).MakeGenericType(subscriptionInfo.MessageType); | |
var dispatchMethod = | |
this.MessageDispatcher.GetType() | |
.GetMethod("AdvancedDispatch", BindingFlags.Instance | BindingFlags.Public) | |
.MakeGenericMethod(subscriptionInfo.MessageType, subscriptionInfo.ConcreteType); | |
var dispatchMethodType = typeof(Func<,,>).MakeGenericType( | |
wrappedMessageParameterType, typeof(MessageReceivedInfo), typeof(Task)); | |
var dispatchDelegate = Delegate.CreateDelegate( | |
dispatchMethodType, this.MessageDispatcher, dispatchMethod); | |
var subscriptionId = this.GenerateSubscriptionId(subscriptionInfo); | |
Log.DebugFormat( | |
"Hooking up {0}-{1} with subscriptionID {2}", | |
subscriptionInfo.ConcreteType, | |
subscriptionInfo.MessageType, | |
subscriptionId); | |
var queue = Queue.DeclareDurable(subscriptionId); | |
var exchange = Exchange.DeclareTopic( | |
TypeNameSerializer.Serialize(subscriptionInfo.MessageType), true, false, null); | |
queue.BindTo(exchange, "#"); | |
var subscribeMethod = advancedSubscribeMethod.MakeGenericMethod(subscriptionInfo.MessageType); | |
subscribeMethod.Invoke(this.bus.Advanced, new object[] { queue, dispatchDelegate }); | |
} | |
} | |
} | |
catch (Exception ex) | |
{ | |
Log.Error(ex); | |
throw; | |
} | |
} | |
/// <summary>The default subscription id generator.</summary> | |
/// <param name="c">The c.</param> | |
/// <returns>The <see cref="string"/>.</returns> | |
protected virtual string DefaultSubscriptionIdGenerator(ConsumerInfo c) | |
{ | |
var r = new StringBuilder(); | |
var unique = string.Concat(c.ConcreteType.FullName, ":", c.MessageType.FullName); | |
using (var md5 = MD5.Create()) | |
{ | |
var buff = md5.ComputeHash(Encoding.UTF8.GetBytes(unique)); | |
foreach (var b in buff) | |
{ | |
r.Append(b.ToString("x2")); | |
} | |
} | |
return string.Concat(c.MessageType.Name, ":", c.ConcreteType.Name, ":", r.ToString()); | |
} | |
/// <summary>The get subscribe async method of bus.</summary> | |
/// <returns>The <see cref="MethodInfo"/>.</returns> | |
protected MethodInfo GetSubscribeAsyncMethodOfBus() | |
{ | |
var info = | |
this.bus.GetType() | |
.GetMethods() | |
.Where(m => m.Name == "SubscribeAsync") | |
.Select(m => new { Method = m, Params = m.GetParameters() }) | |
.Single(m => m.Params.Length == 2 && m.Params[0].ParameterType == typeof(string)) | |
.Method; | |
return info; | |
} | |
/// <summary>The get subscribe method of advanced bus.</summary> | |
/// <returns>The <see cref="MethodInfo"/>.</returns> | |
protected MethodInfo GetSubscribeMethodOfAdvancedBus() | |
{ | |
// this is brittle - we're looking for | |
// public virtual void Subscribe<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> | |
// not | |
// public virtual void Subscribe(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage) | |
// it will do for now but may break if new APIs are added to the EasyNetQ Advanced Bus Api (IAdvancedBus) | |
var info = | |
this.bus.Advanced.GetType() | |
.GetMethods() | |
.Where(x => x.Name == "Subscribe") | |
.Select(x => new { Method = x, Params = x.GetParameters() }) | |
.Single( | |
x => | |
x.Params.Length == 2 && x.Params[0].ParameterType == typeof(IQueue) | |
&& x.Params[1].ParameterType | |
!= typeof(Func<byte[], MessageProperties, MessageReceivedInfo, Task>)) | |
.Method; | |
return info; | |
} | |
/// <summary>Gets Subscriber Information.</summary> | |
/// <param name="types">The types.</param> | |
/// <returns>The a collection of KeyValuePairs mapping types to ConsumerInfo array.</returns> | |
protected virtual IEnumerable<KeyValuePair<Type, ConsumerInfo[]>> GetSubscriptionInfos(IEnumerable<Type> types) | |
{ | |
return from concreteType in types.Where(t => t.IsClass) | |
let subscriptionInfos = | |
concreteType.GetInterfaces() | |
.Where( | |
i => | |
i.IsGenericType | |
&& (i.GetGenericTypeDefinition() == typeof(IEventHandler<>) | |
|| i.GetGenericTypeDefinition() == typeof(ICommandHandler<>))) | |
.Select(i => new ConsumerInfo(concreteType, i, i.GetGenericArguments()[0])) | |
.ToArray() | |
where subscriptionInfos.Any() | |
select new KeyValuePair<Type, ConsumerInfo[]>(concreteType, subscriptionInfos); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment