Last active
May 8, 2019 19:38
-
-
Save juliofalbo/cf222efbd55ff70f9300b75477edd4f4 to your computer and use it in GitHub Desktop.
RabbitMQ Custom Autoconfig
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.julio.poc.aync.config; | |
import static java.util.stream.Collectors.groupingBy; | |
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.Optional; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; | |
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; | |
import org.springframework.amqp.rabbit.core.RabbitAdmin; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; | |
import org.springframework.amqp.support.converter.MessageConverter; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.beans.factory.config.BeanDefinition; | |
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; | |
import org.springframework.beans.factory.support.AutowireCandidateQualifier; | |
import org.springframework.beans.factory.support.BeanDefinitionBuilder; | |
import org.springframework.beans.factory.support.BeanDefinitionRegistry; | |
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.ApplicationContextAware; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Primary; | |
import org.springframework.context.annotation.PropertySource; | |
import org.springframework.core.env.AbstractEnvironment; | |
import org.springframework.core.io.support.ResourcePropertySource; | |
import com.fasterxml.jackson.databind.DeserializationFeature; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.google.common.base.CaseFormat; | |
import com.rabbitmq.client.ConnectionFactory; | |
import com.rabbitmq.client.DefaultSaslConfig; | |
import com.julio.poc.aync.Constants; | |
import com.julio.poc.aync.config.properties.generic.QueueProperties; | |
import lombok.NonNull; | |
import lombok.extern.slf4j.Slf4j; | |
@Configuration | |
@Slf4j | |
@PropertySource(RabbitMQConfig.CLASSPATH_QUEUES_PROPERTIES) | |
public class RabbitMQConfig implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware { | |
@Value("${rabbitmq.port}") | |
private int port; | |
private ApplicationContext context; | |
protected static final String QUEUS_CONFIG_NAME_FILE = "queues-config.properties"; | |
protected static final String CLASSPATH_QUEUES_PROPERTIES = "classpath:queues/" + QUEUS_CONFIG_NAME_FILE; | |
@Bean | |
public MessageConverter producerJackson2MessageConverter() { | |
return new Jackson2JsonMessageConverter(); | |
} | |
private void createRabbitMQArch(@NonNull final QueueProperties property, BeanDefinitionRegistry registry) { | |
final AtomicBoolean primary = new AtomicBoolean(true); | |
Stream.of(property.getQueue(), property.getQueueRetry(), property.getQueueDlq()) | |
.forEach(queue -> Optional.ofNullable(createConnectionsFactoryBean(property, queue, primary.get())).ifPresent(beanDefinitionConnectionFactory -> { | |
String connectionFactoryBeanName = getConnectionFactoryBeanName(queue); | |
registry.registerBeanDefinition(connectionFactoryBeanName, beanDefinitionConnectionFactory); | |
log.info("M=RabbitMQConfig.createRabbitMQArch, I=ConnectionFactory Bean with name {} was created for the queue {}", connectionFactoryBeanName, queue); | |
if (!queue.contains("retry") && !queue.contains("dlq")) { | |
String listenerContainerFactoryBeanName = getSimpleRabbitListenerContainerFactoryBean(queue); | |
BeanDefinition simpleRabbitListenerContainerFactoryBeanDef = createSimpleRabbitListenerContainerFactoryBean(property, | |
(CachingConnectionFactory) context.getBean(connectionFactoryBeanName), primary.get()); | |
registry.registerBeanDefinition(listenerContainerFactoryBeanName, simpleRabbitListenerContainerFactoryBeanDef); | |
log.info("M=RabbitMQConfig.createRabbitMQArch, I=SimpleRabbitListenerContainerFactory Bean with name {} was created for the queue {}", listenerContainerFactoryBeanName, queue); | |
} | |
BeanDefinition beanDefinitionRabbitAdmin = createRabbitAdminBean(connectionFactoryBeanName, primary.get()); | |
String rabbitAdminBeanName = getRabbitAdminBeanName(queue); | |
registry.registerBeanDefinition(rabbitAdminBeanName, beanDefinitionRabbitAdmin); | |
log.info("M=RabbitMQConfig.createRabbitMQArch, I=RabbitAdmin Bean with name {} was created for the queue {}", rabbitAdminBeanName, queue); | |
BeanDefinition beanDefinitionRabbitTemplate = createRabbitTemplateBean(connectionFactoryBeanName, primary.get()); | |
String rabbitTemplateBeanName = getRabbitTemplateBeanName(queue); | |
registry.registerBeanDefinition(rabbitTemplateBeanName, beanDefinitionRabbitTemplate); | |
log.info("M=RabbitMQConfig.createRabbitMQArch, I=RabbitTemplate Bean with name {} was created for the queue {}", rabbitTemplateBeanName, queue); | |
primary.set(false); | |
})); | |
} | |
private BeanDefinitionBuilder createBeanDefinitionBase(final Class type, final boolean isPrimary) { | |
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(type); | |
if (isPrimary) { | |
beanDefinitionBuilder.getBeanDefinition().addQualifier(new AutowireCandidateQualifier(Primary.class)); | |
beanDefinitionBuilder.getBeanDefinition().setPrimary(true); | |
} | |
beanDefinitionBuilder.setScope(SCOPE_SINGLETON); | |
return beanDefinitionBuilder; | |
} | |
private BeanDefinition createConnectionsFactoryBean(@NonNull final QueueProperties property, String queue, boolean isPrimary) { | |
try { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setSaslConfig(DefaultSaslConfig.EXTERNAL); | |
// factory.useSslProtocol(TLSContextUtil.tls12ContextFromPKCS12(property.getTlsKeystore().getInputStream(), | |
// property.getTlsKeystorePassword().toCharArray())); | |
factory.setHost(context.getEnvironment().getProperty("rabbitmq.host")); | |
factory.setPort(Integer.valueOf(context.getEnvironment().getProperty("rabbitmq.port"))); | |
factory.setAutomaticRecoveryEnabled(property.isAutomaticRecovery()); | |
Optional.ofNullable(property.getVirtualHost()).ifPresent(factory::setVirtualHost); | |
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(CachingConnectionFactory.class, isPrimary); | |
return beanDefinitionBuilder.addConstructorArgValue(factory).getBeanDefinition(); | |
} catch (Exception e) { | |
log.error(String.format("It is not possible create a Connection Factory to Queue %s", queue), e); | |
return null; | |
} | |
} | |
private BeanDefinition createSimpleRabbitListenerContainerFactoryBean(@NonNull final QueueProperties property, CachingConnectionFactory connectionFactory, boolean isPrimary) { | |
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(SimpleRabbitListenerContainerFactory.class, isPrimary); | |
beanDefinitionBuilder.addPropertyValue("connectionFactory", connectionFactory) | |
.addPropertyValue("messageConverter", producerJackson2MessageConverter()); | |
beanDefinitionBuilder.addPropertyValue("concurrentConsumers", property.getConcurrentConsumers()); | |
beanDefinitionBuilder.addPropertyValue("maxConcurrentConsumers", property.getMaxConcurrentConsumers()); | |
return beanDefinitionBuilder.getBeanDefinition(); | |
} | |
private BeanDefinition createRabbitAdminBean(String connectionFactoryBeanName, boolean isPrimary) { | |
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(RabbitAdmin.class, isPrimary); | |
beanDefinitionBuilder.addConstructorArgReference(connectionFactoryBeanName); | |
return beanDefinitionBuilder.getBeanDefinition(); | |
} | |
private BeanDefinition createRabbitTemplateBean(String connectionFactoryBeanName, boolean isPrimary) { | |
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(RabbitTemplate.class, isPrimary); | |
beanDefinitionBuilder.addConstructorArgReference(connectionFactoryBeanName); | |
return beanDefinitionBuilder.getBeanDefinition(); | |
} | |
private String getConnectionFactoryBeanName(String key) { | |
return Constants.PREFIX_CONNECTION_FACTORY_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key); | |
} | |
private String getRabbitTemplateBeanName(String key) { | |
return Constants.PREFIX_RABBIT_TEMPLATE_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key); | |
} | |
private String getRabbitAdminBeanName(String key) { | |
return Constants.PREFIX_RABBIT_ADMIN_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key); | |
} | |
private String getSimpleRabbitListenerContainerFactoryBean(String key) { | |
return Constants.PREFIX_SIMPLE_RABBIT_LISTENER_CONTAINER_FACTORY_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key); | |
} | |
@Override | |
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { | |
bindProperties().forEach(property -> createRabbitMQArch(property, registry)); | |
} | |
@Override | |
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { | |
} | |
@Override | |
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { | |
this.context = applicationContext; | |
} | |
private List<QueueProperties> bindProperties() { | |
Map<String, Object> map = new HashMap(); | |
((AbstractEnvironment) context.getEnvironment()).getPropertySources() | |
.stream() | |
.filter(propertySource -> propertySource instanceof ResourcePropertySource) | |
.map(propertySource -> (ResourcePropertySource) propertySource) | |
.map(org.springframework.core.env.PropertySource::getSource) | |
.forEach(map::putAll); | |
String prefix = "rabbitmq.custom.configs"; | |
Map<String, Object> collect = map.entrySet().stream() | |
.filter(e -> e.getKey().startsWith(prefix)) | |
.collect(Collectors.toMap(o -> o.getKey().replace(prefix + ".", ""), Map.Entry::getValue)); | |
ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | |
return collect.entrySet().stream().collect(groupingBy(o -> o.getKey().split("\\.")[0])) | |
.entrySet().stream() | |
.map(stringListEntry -> stringListEntry.getValue().stream() | |
.collect(Collectors.toMap(o -> o.getKey().replace(stringListEntry.getKey() + ".", ""), o -> { | |
String newValue = context.getEnvironment().getProperty(o.getValue().toString().replace("$", "").replace("{", "").replace("}", "")); | |
return Objects.requireNonNullElse(newValue, o.getValue()); | |
}))) | |
.map(stringObjectMap -> objectMapper.convertValue(stringObjectMap, QueueProperties.class)).collect(Collectors.toList()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment