Last active
August 29, 2015 14:01
-
-
Save randomcodenz/d13f308c2544cff3a856 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Explicit opt-in message versioning via a marker interface and exchange to exchange bindings to | |
// route messages from publishing exchange to previous message version exchanges. | |
// Message serialisation adds all message versions into the message headers and deserialisation | |
// keeps trying the listed types until it finds one it can create (or runs out and throws an exception) | |
public interface ISupersede<T> where T : class { } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RabbitAdvancedBusWithVersionSupport : RabbitAdvancedBus, IAdvancedBus | |
{ | |
private const string AlternativeMessageTypesHeaderKey = "Alternative-Message-Types"; | |
private const string AlternativeMessageTypeSeparator = ";"; | |
private readonly ISerializer _serializer; | |
private readonly Func<string> _getCorrelationId; | |
private readonly ITypeNameSerializer _typeNameSerializer; | |
private readonly IHandlerCollectionFactory _handlerCollectionFactory; | |
public RabbitAdvancedBusWithVersionSupport( IConnectionFactory connectionFactory, ISerializer serializer, IConsumerFactory consumerFactory, IEasyNetQLogger logger, Func<string> getCorrelationId, IClientCommandDispatcherFactory clientCommandDispatcherFactory, IPublisher publisher, IEventBus eventBus, ITypeNameSerializer typeNameSerializer, IHandlerCollectionFactory handlerCollectionFactory, IContainer container ) : base( connectionFactory, serializer, consumerFactory, logger, getCorrelationId, clientCommandDispatcherFactory, publisher, eventBus, typeNameSerializer, handlerCollectionFactory, container ) | |
{ | |
_serializer = serializer; | |
_getCorrelationId = getCorrelationId; | |
_typeNameSerializer = typeNameSerializer; | |
_handlerCollectionFactory = handlerCollectionFactory; | |
} | |
public override Task PublishAsync<T>(IExchange exchange, string routingKey, bool mandatory, bool immediate, IMessage<T> message) | |
{ | |
CheckNotNull(exchange, "exchange"); | |
CheckShortString(routingKey, "routingKey"); | |
CheckNotNull(message, "message"); | |
var typeName = _typeNameSerializer.Serialize(message.Body.GetType()); | |
var messageBody = _serializer.MessageToBytes(message.Body); | |
message.Properties.Type = typeName; | |
message.Properties.CorrelationId = | |
string.IsNullOrEmpty(message.Properties.CorrelationId) ? | |
_getCorrelationId() : | |
message.Properties.CorrelationId; | |
AddAlternativeMessageTypes( message ); | |
return PublishAsync(exchange, routingKey, mandatory, immediate, message.Properties, messageBody); | |
} | |
public new IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage, Action<IConsumerConfiguration> configure) where T : class | |
{ | |
CheckNotNull(queue, "queue"); | |
CheckNotNull(onMessage, "onMessage"); | |
CheckNotNull(configure, "configure"); | |
return Consume(queue, x => x.Add(onMessage), configure); | |
} | |
public override IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers) | |
{ | |
return Consume(queue, addHandlers, x => { }); | |
} | |
public new IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers, Action<IConsumerConfiguration> configure) | |
{ | |
CheckNotNull(queue, "queue"); | |
CheckNotNull(addHandlers, "addHandlers"); | |
CheckNotNull(configure, "configure"); | |
var handlerCollection = _handlerCollectionFactory.CreateHandlerCollection(); | |
addHandlers(handlerCollection); | |
return Consume( queue, ( body, properties, messageReceivedInfo ) => | |
{ | |
var messageType = GetMessageType( properties ); | |
var messageTypeString = _typeNameSerializer.Serialize( messageType ); | |
var handler = handlerCollection.GetHandler( messageType ); | |
var messageBody = _serializer.BytesToMessage( messageTypeString, body ); | |
var message = Message.CreateInstance( messageType, messageBody ); | |
message.SetProperties( properties ); | |
return handler( message, messageReceivedInfo ); | |
}, configure ); | |
} | |
private void AddAlternativeMessageTypes<T>( IMessage<T> message ) | |
{ | |
var alternativeMessageTypes = new List<string>(); | |
var messageType = typeof( T ); | |
var supersededType = GetSupersededMessageType( messageType ); | |
while( supersededType != null ) | |
{ | |
var alternativeTypeName = _typeNameSerializer.Serialize( supersededType ); | |
alternativeMessageTypes.Add( alternativeTypeName ); | |
supersededType = GetSupersededMessageType( supersededType ); | |
} | |
var alternativeTypes = string.Join( AlternativeMessageTypeSeparator, alternativeMessageTypes ); | |
message.Properties.Headers.Add( AlternativeMessageTypesHeaderKey, alternativeTypes ); | |
} | |
private static Type GetSupersededMessageType(Type messageType) | |
{ | |
return messageType | |
.GetInterfaces() | |
.Where(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ISupersede<>)) | |
.SelectMany(t => t.GetGenericArguments()) | |
.FirstOrDefault(); | |
} | |
private Type GetMessageType( MessageProperties messageProperties ) | |
{ | |
Type messageType; | |
if( TryGetType( messageProperties.Type, out messageType ) ) | |
return messageType; | |
if( !messageProperties.HeadersPresent || !messageProperties.Headers.ContainsKey( AlternativeMessageTypesHeaderKey ) ) | |
throw new EasyNetQException( "Cannot find type {0}", messageProperties.Type ); | |
var alternativeTypeHeaderRaw = messageProperties.Headers[ AlternativeMessageTypesHeaderKey ] as byte[]; | |
if( alternativeTypeHeaderRaw == null ) | |
throw new EasyNetQException( "Unable to extract raw {0} header as byte[]", AlternativeMessageTypesHeaderKey ); | |
var alternativeTypeHeader = Encoding.UTF8.GetString( alternativeTypeHeaderRaw ); | |
if( alternativeTypeHeader == null ) | |
throw new EasyNetQException( "Unable to convert raw {0} header to a string", AlternativeMessageTypesHeaderKey ); | |
var alternativeTypes = alternativeTypeHeader.Split( new[] {AlternativeMessageTypeSeparator}, StringSplitOptions.RemoveEmptyEntries ); | |
foreach( var alternativeType in alternativeTypes ) | |
{ | |
if( TryGetType( alternativeType, out messageType ) ) | |
return messageType; | |
} | |
throw new EasyNetQException( "Cannot find declared message type {0} or any of the specified alternative types {1}", messageProperties.Type, alternativeTypeHeader ); | |
} | |
private bool TryGetType( string typeString, out Type messageType ) | |
{ | |
try | |
{ | |
messageType = _typeNameSerializer.DeSerialize( typeString ); | |
return true; | |
} | |
catch | |
{ | |
messageType = null; | |
return false; | |
} | |
} | |
private static void CheckNotNull<T>(T value, string name) where T : class | |
{ | |
CheckNotNull(value, name, string.Format("{0} must not be null", name)); | |
} | |
private static void CheckShortString(string value, string name) | |
{ | |
CheckNotNull(value, name); | |
if (value.Length > 255) | |
{ | |
throw new ArgumentException(string.Format("Argument '{0}' must be less than or equal to 255 characters.", name)); | |
} | |
} | |
private static void CheckNotNull<T>(T value, string name, string message) where T : class | |
{ | |
CheckNotBlank(name, "name", "name must not be blank"); | |
CheckNotBlank(message, "message", "message must not be blank"); | |
if (value == null) | |
{ | |
throw new ArgumentNullException(name, message); | |
} | |
} | |
private static void CheckNotBlank(string value, string name, string message) | |
{ | |
if (string.IsNullOrWhiteSpace(name)) | |
{ | |
throw new ArgumentException("name must not be blank", "name"); | |
} | |
if (string.IsNullOrWhiteSpace(message)) | |
{ | |
throw new ArgumentException("message must not be blank", "message"); | |
} | |
if (string.IsNullOrWhiteSpace(value)) | |
{ | |
throw new ArgumentException(message, name); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class VersionedPublishExchangeDeclareStrategy : IPublishExchangeDeclareStrategy | |
{ | |
private readonly ConcurrentDictionary<string, IExchange> exchangeNames = | |
new ConcurrentDictionary<string, IExchange>(); | |
public IExchange DeclareExchange(IAdvancedBus advancedBus, string exchangeName, string exchangeType) | |
{ | |
return exchangeNames.AddOrUpdate( | |
exchangeName, | |
name => advancedBus.ExchangeDeclare(name, exchangeType), | |
(_, exchange) => exchange); | |
} | |
public IExchange DeclareExchange(IAdvancedBus advancedBus, Type messageType, string exchangeType) | |
{ | |
var conventions = advancedBus.Container.Resolve<IConventions>(); | |
var exchangeName = conventions.ExchangeNamingConvention(messageType); | |
var publishExchange = DeclareExchange(advancedBus, exchangeName, exchangeType); | |
ConfigureMessageVersioning( publishExchange, advancedBus, messageType, exchangeType ); | |
return publishExchange; | |
} | |
private void ConfigureMessageVersioning( IExchange source, IAdvancedBus advancedBus, Type messageType, string exchangeType ) | |
{ | |
var supersededMessageType = GetSupersededMessageType(messageType); | |
if( supersededMessageType == null ) | |
return; | |
var supersededExchange = DeclareExchange(advancedBus, supersededMessageType, exchangeType); | |
advancedBus.Bind( source, supersededExchange, "#" ); | |
} | |
private static Type GetSupersededMessageType( Type messageType ) | |
{ | |
return messageType | |
.GetInterfaces() | |
.Where( t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof( ISupersede<> ) ) | |
.SelectMany( t => t.GetGenericArguments() ) | |
.FirstOrDefault(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment