Skip to content

Instantly share code, notes, and snippets.

@juliofalbo
Last active May 8, 2019 19:38
Show Gist options
  • Save juliofalbo/cf222efbd55ff70f9300b75477edd4f4 to your computer and use it in GitHub Desktop.
Save juliofalbo/cf222efbd55ff70f9300b75477edd4f4 to your computer and use it in GitHub Desktop.
RabbitMQ Custom Autoconfig
package com.julio.poc.aync.config;
import static java.util.stream.Collectors.groupingBy;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AutowireCandidateQualifier;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.AbstractEnvironment;
import org.springframework.core.io.support.ResourcePropertySource;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CaseFormat;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import com.julio.poc.aync.Constants;
import com.julio.poc.aync.config.properties.generic.QueueProperties;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Configuration
@Slf4j
@PropertySource(RabbitMQConfig.CLASSPATH_QUEUES_PROPERTIES)
public class RabbitMQConfig implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {
@Value("${rabbitmq.port}")
private int port;
private ApplicationContext context;
protected static final String QUEUS_CONFIG_NAME_FILE = "queues-config.properties";
protected static final String CLASSPATH_QUEUES_PROPERTIES = "classpath:queues/" + QUEUS_CONFIG_NAME_FILE;
@Bean
public MessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
private void createRabbitMQArch(@NonNull final QueueProperties property, BeanDefinitionRegistry registry) {
final AtomicBoolean primary = new AtomicBoolean(true);
Stream.of(property.getQueue(), property.getQueueRetry(), property.getQueueDlq())
.forEach(queue -> Optional.ofNullable(createConnectionsFactoryBean(property, queue, primary.get())).ifPresent(beanDefinitionConnectionFactory -> {
String connectionFactoryBeanName = getConnectionFactoryBeanName(queue);
registry.registerBeanDefinition(connectionFactoryBeanName, beanDefinitionConnectionFactory);
log.info("M=RabbitMQConfig.createRabbitMQArch, I=ConnectionFactory Bean with name {} was created for the queue {}", connectionFactoryBeanName, queue);
if (!queue.contains("retry") && !queue.contains("dlq")) {
String listenerContainerFactoryBeanName = getSimpleRabbitListenerContainerFactoryBean(queue);
BeanDefinition simpleRabbitListenerContainerFactoryBeanDef = createSimpleRabbitListenerContainerFactoryBean(property,
(CachingConnectionFactory) context.getBean(connectionFactoryBeanName), primary.get());
registry.registerBeanDefinition(listenerContainerFactoryBeanName, simpleRabbitListenerContainerFactoryBeanDef);
log.info("M=RabbitMQConfig.createRabbitMQArch, I=SimpleRabbitListenerContainerFactory Bean with name {} was created for the queue {}", listenerContainerFactoryBeanName, queue);
}
BeanDefinition beanDefinitionRabbitAdmin = createRabbitAdminBean(connectionFactoryBeanName, primary.get());
String rabbitAdminBeanName = getRabbitAdminBeanName(queue);
registry.registerBeanDefinition(rabbitAdminBeanName, beanDefinitionRabbitAdmin);
log.info("M=RabbitMQConfig.createRabbitMQArch, I=RabbitAdmin Bean with name {} was created for the queue {}", rabbitAdminBeanName, queue);
BeanDefinition beanDefinitionRabbitTemplate = createRabbitTemplateBean(connectionFactoryBeanName, primary.get());
String rabbitTemplateBeanName = getRabbitTemplateBeanName(queue);
registry.registerBeanDefinition(rabbitTemplateBeanName, beanDefinitionRabbitTemplate);
log.info("M=RabbitMQConfig.createRabbitMQArch, I=RabbitTemplate Bean with name {} was created for the queue {}", rabbitTemplateBeanName, queue);
primary.set(false);
}));
}
private BeanDefinitionBuilder createBeanDefinitionBase(final Class type, final boolean isPrimary) {
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(type);
if (isPrimary) {
beanDefinitionBuilder.getBeanDefinition().addQualifier(new AutowireCandidateQualifier(Primary.class));
beanDefinitionBuilder.getBeanDefinition().setPrimary(true);
}
beanDefinitionBuilder.setScope(SCOPE_SINGLETON);
return beanDefinitionBuilder;
}
private BeanDefinition createConnectionsFactoryBean(@NonNull final QueueProperties property, String queue, boolean isPrimary) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
// factory.useSslProtocol(TLSContextUtil.tls12ContextFromPKCS12(property.getTlsKeystore().getInputStream(),
// property.getTlsKeystorePassword().toCharArray()));
factory.setHost(context.getEnvironment().getProperty("rabbitmq.host"));
factory.setPort(Integer.valueOf(context.getEnvironment().getProperty("rabbitmq.port")));
factory.setAutomaticRecoveryEnabled(property.isAutomaticRecovery());
Optional.ofNullable(property.getVirtualHost()).ifPresent(factory::setVirtualHost);
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(CachingConnectionFactory.class, isPrimary);
return beanDefinitionBuilder.addConstructorArgValue(factory).getBeanDefinition();
} catch (Exception e) {
log.error(String.format("It is not possible create a Connection Factory to Queue %s", queue), e);
return null;
}
}
private BeanDefinition createSimpleRabbitListenerContainerFactoryBean(@NonNull final QueueProperties property, CachingConnectionFactory connectionFactory, boolean isPrimary) {
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(SimpleRabbitListenerContainerFactory.class, isPrimary);
beanDefinitionBuilder.addPropertyValue("connectionFactory", connectionFactory)
.addPropertyValue("messageConverter", producerJackson2MessageConverter());
beanDefinitionBuilder.addPropertyValue("concurrentConsumers", property.getConcurrentConsumers());
beanDefinitionBuilder.addPropertyValue("maxConcurrentConsumers", property.getMaxConcurrentConsumers());
return beanDefinitionBuilder.getBeanDefinition();
}
private BeanDefinition createRabbitAdminBean(String connectionFactoryBeanName, boolean isPrimary) {
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(RabbitAdmin.class, isPrimary);
beanDefinitionBuilder.addConstructorArgReference(connectionFactoryBeanName);
return beanDefinitionBuilder.getBeanDefinition();
}
private BeanDefinition createRabbitTemplateBean(String connectionFactoryBeanName, boolean isPrimary) {
BeanDefinitionBuilder beanDefinitionBuilder = createBeanDefinitionBase(RabbitTemplate.class, isPrimary);
beanDefinitionBuilder.addConstructorArgReference(connectionFactoryBeanName);
return beanDefinitionBuilder.getBeanDefinition();
}
private String getConnectionFactoryBeanName(String key) {
return Constants.PREFIX_CONNECTION_FACTORY_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key);
}
private String getRabbitTemplateBeanName(String key) {
return Constants.PREFIX_RABBIT_TEMPLATE_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key);
}
private String getRabbitAdminBeanName(String key) {
return Constants.PREFIX_RABBIT_ADMIN_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key);
}
private String getSimpleRabbitListenerContainerFactoryBean(String key) {
return Constants.PREFIX_SIMPLE_RABBIT_LISTENER_CONTAINER_FACTORY_PER_QUEUE + CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, key);
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
bindProperties().forEach(property -> createRabbitMQArch(property, registry));
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
private List<QueueProperties> bindProperties() {
Map<String, Object> map = new HashMap();
((AbstractEnvironment) context.getEnvironment()).getPropertySources()
.stream()
.filter(propertySource -> propertySource instanceof ResourcePropertySource)
.map(propertySource -> (ResourcePropertySource) propertySource)
.map(org.springframework.core.env.PropertySource::getSource)
.forEach(map::putAll);
String prefix = "rabbitmq.custom.configs";
Map<String, Object> collect = map.entrySet().stream()
.filter(e -> e.getKey().startsWith(prefix))
.collect(Collectors.toMap(o -> o.getKey().replace(prefix + ".", ""), Map.Entry::getValue));
ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return collect.entrySet().stream().collect(groupingBy(o -> o.getKey().split("\\.")[0]))
.entrySet().stream()
.map(stringListEntry -> stringListEntry.getValue().stream()
.collect(Collectors.toMap(o -> o.getKey().replace(stringListEntry.getKey() + ".", ""), o -> {
String newValue = context.getEnvironment().getProperty(o.getValue().toString().replace("$", "").replace("{", "").replace("}", ""));
return Objects.requireNonNullElse(newValue, o.getValue());
})))
.map(stringObjectMap -> objectMapper.convertValue(stringObjectMap, QueueProperties.class)).collect(Collectors.toList());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment