Last active
January 11, 2025 20:39
-
-
Save tyb/44efc87eb8052ccf8ce073d66ce67a14 to your computer and use it in GitHub Desktop.
Spring Boot JTA Atomikos & JMS configuration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.xxx.common.config; | |
import com.atomikos.icatch.config.UserTransactionServiceImp; | |
import com.atomikos.icatch.jta.UserTransactionImp; | |
import com.atomikos.icatch.jta.UserTransactionManager; | |
import net.sf.ehcache.transaction.manager.TransactionManagerLookup; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.boot.context.properties.EnableConfigurationProperties; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.DependsOn; | |
import org.springframework.core.env.Environment; | |
import javax.transaction.TransactionManager; | |
import javax.transaction.UserTransaction; | |
import java.util.Properties; | |
@Configuration | |
@DependsOn("initializingBeanImpl") | |
public class AtomikosJtaConfiguration { | |
@Autowired | |
private Environment environment ; | |
@Value("${spring.application.name}") | |
private String APPLICATION_NAME; | |
public void tailorProperties(Properties properties) { | |
properties.setProperty( "hibernate.transaction.manager_lookup_class", | |
TransactionManagerLookup.class.getName()); | |
} | |
@Bean(name="userTransactionServiceImp", initMethod = "init", destroyMethod = "shutdownForce") | |
public UserTransactionServiceImp userTransactionServiceImp() | |
{ | |
Properties properties = new Properties(); | |
properties.setProperty("com.atomikos.icatch.max_timeout", "3600000"); | |
properties.setProperty("com.atomikos.icatch.service", "com.atomikos.icatch.standalone.UserTransactionServiceFactory"); | |
properties.setProperty("com.atomikos.icatch.log_base_name", APPLICATION_NAME); | |
properties.setProperty("com.atomikos.icatch.output_dir", "../standalone/log/"); | |
properties.setProperty("com.atomikos.icatch.log_base_dir", "../standalone/log/"); | |
UserTransactionServiceImp userTransactionServiceImp = new UserTransactionServiceImp(properties); | |
return userTransactionServiceImp; | |
} | |
//@Bean(initMethod = "init", destroyMethod = "close", name = "userTransaction") | |
@DependsOn("userTransactionServiceImp") | |
@Bean(name = "userTransaction") | |
public UserTransaction userTransaction() throws Throwable { | |
UserTransactionImp userTransactionImp = new UserTransactionImp(); | |
userTransactionImp.setTransactionTimeout(1000); | |
return userTransactionImp; | |
} | |
@Bean(initMethod = "init", destroyMethod = "close", name = "transactionManager") | |
@DependsOn("userTransactionServiceImp") | |
//@Bean(name = "transactionManager") | |
public TransactionManager transactionManager() throws Throwable { | |
UserTransactionManager userTransactionManager = new UserTransactionManager(); | |
userTransactionManager.setForceShutdown(false); | |
return userTransactionManager; | |
} | |
} | |
-- | |
package com.xxx.common.config; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.DependsOn; | |
import org.springframework.context.annotation.Primary; | |
import org.springframework.transaction.PlatformTransactionManager; | |
import org.springframework.transaction.jta.JtaTransactionManager; | |
import javax.transaction.TransactionManager; | |
import javax.transaction.UserTransaction; | |
@Configuration | |
public class TxConfig { | |
@Autowired | |
private AtomikosJtaConfiguration jtaConfiguration ; | |
@Bean(name = "platformTransactionManager") | |
@DependsOn("transactionManager") | |
@Primary | |
public PlatformTransactionManager platformTransactionManager() throws Throwable { | |
UserTransaction userTransaction = jtaConfiguration.userTransaction() ; | |
TransactionManager transactionManager = jtaConfiguration.transactionManager() ; | |
return new JtaTransactionManager( userTransaction, transactionManager ); | |
} | |
} | |
--- | |
package com.xxx.common.config; | |
import com.atomikos.jms.AtomikosConnectionFactoryBean; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.databind.SerializationFeature; | |
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | |
import org.apache.activemq.ActiveMQConnectionFactory; | |
import org.apache.activemq.ActiveMQXAConnectionFactory; | |
import org.apache.activemq.command.ActiveMQQueue; | |
import org.apache.activemq.command.ActiveMQTopic; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; | |
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer; | |
import org.springframework.context.annotation.*; | |
import org.springframework.jms.annotation.EnableJms; | |
import org.springframework.jms.annotation.JmsListenerConfigurer; | |
import org.springframework.jms.config.DefaultJmsListenerContainerFactory; | |
import org.springframework.jms.config.JmsListenerEndpointRegistrar; | |
import org.springframework.jms.connection.CachingConnectionFactory; | |
import org.springframework.jms.core.JmsTemplate; | |
import org.springframework.jms.support.converter.MessageConversionException; | |
import org.springframework.jms.support.converter.MessageConverter; | |
import org.springframework.jms.support.destination.DestinationResolver; | |
import org.springframework.jms.support.destination.DynamicDestinationResolver; | |
//import org.springframework.jms.support.destination.JndiDestinationResolver; | |
import org.springframework.stereotype.Component; | |
import org.springframework.transaction.annotation.EnableTransactionManagement; | |
import org.springframework.util.ErrorHandler; | |
import javax.inject.Inject; | |
import javax.jms.*; | |
@Configuration | |
@DependsOn("platformTransactionManager") | |
@EnableJms | |
@EnableTransactionManagement | |
@EnableAutoConfiguration | |
@PropertySource(ignoreResourceNotFound = true, value = "classpath:common.properties") | |
public class JMSConfig implements JmsListenerConfigurer { | |
@Value("${activemq.broker-url}") | |
String brokerURL; | |
@Value("${activemq.user}") | |
String userName; | |
@Value("${activemq.password}") | |
String password; | |
@Value("${trans.alert.topic}") | |
private String transactionAlertTopicName; | |
@Value("${trans.alert.email.queue}") | |
private String emailNotificationQueueName; | |
@Value("${trans.alert.sms.queue}") | |
private String smsNotificationQueueName; | |
@Bean | |
public Topic transactionAlertTopic() { | |
return new ActiveMQTopic(transactionAlertTopicName); | |
} | |
@Bean | |
public Queue emailNotificationQueue() { | |
return new ActiveMQQueue(emailNotificationQueueName); | |
} | |
@Bean | |
public Queue smsNotificationQueue() { | |
return new ActiveMQQueue(smsNotificationQueueName); | |
} | |
@Inject | |
private AtomikosJtaConfiguration jtaConfiguration; | |
@Bean("queueConnectionFactory") | |
public QueueConnectionFactory queueConnectionFactory() { | |
ActiveMQConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory(); | |
//QueueConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory(); | |
queueConnectionFactory.setTrustAllPackages(true); | |
queueConnectionFactory.setBrokerURL(brokerURL); | |
queueConnectionFactory.setUserName(userName); | |
queueConnectionFactory.setPassword(password); | |
return queueConnectionFactory; | |
} | |
//pubsubdomain i belirtmemiz gerekeceğinden ayrıştırmamız gerekirdi. | |
@Bean(initMethod = "init", destroyMethod = "close", name = "xaTopicConnectionFactory") | |
public ConnectionFactory xaTopicConnectionFactory() { | |
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(); | |
activeMQXAConnectionFactory.setBrokerURL(brokerURL); | |
activeMQXAConnectionFactory.setUserName(userName); | |
activeMQXAConnectionFactory.setPassword(password); | |
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean(); | |
atomikosConnectionFactoryBean.setUniqueResourceName("xaTopic"); | |
atomikosConnectionFactoryBean.setLocalTransactionMode(false); | |
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory); | |
return atomikosConnectionFactoryBean; | |
} | |
@Bean(initMethod = "init", destroyMethod = "close", name = "xaQueueConnectionFactory") | |
public ConnectionFactory xaQueueConnectionFactory() { | |
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(); | |
activeMQXAConnectionFactory.setBrokerURL(brokerURL); | |
activeMQXAConnectionFactory.setUserName(userName); | |
activeMQXAConnectionFactory.setPassword(password); | |
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean(); | |
atomikosConnectionFactoryBean.setUniqueResourceName("xaQueue"); | |
atomikosConnectionFactoryBean.setLocalTransactionMode(false); | |
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory); | |
return atomikosConnectionFactoryBean; | |
} | |
@Bean("topicConnectionFactory") | |
@Primary | |
//TODO: Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed | |
public TopicConnectionFactory topicConnectionFactory() { | |
ActiveMQConnectionFactory topicConnectionFactory = new ActiveMQConnectionFactory(); | |
topicConnectionFactory.setBrokerURL(brokerURL); | |
//TODO: servislerin application.properties'inden değil de common'daki properties den alıyor. | |
topicConnectionFactory.setUserName(userName); | |
topicConnectionFactory.setPassword(password); | |
topicConnectionFactory.setTrustAllPackages(true); | |
topicConnectionFactory.setUseAsyncSend(true); | |
// CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); | |
return topicConnectionFactory; | |
} | |
//PooledConnectionFactory vendor specific. CachingConnectionFactory ise Spring'e ait. | |
//JBOSS'un kullandığı ise ActiveMQManagedConnectionFactory o da container specific. | |
/* | |
Without connection pooling, JMSTemplate, by default, creates a new connection, session, | |
producer for each message sent and then closes them all again. | |
This results in a long workaround time and you getting reconnected for every JMS message sent. | |
*/ | |
@Bean(name = "cachingQueueContainerFactory") | |
public CachingConnectionFactory cachingQueueConnectionFactory() { | |
CachingConnectionFactory cachingConnectionFactory = | |
new CachingConnectionFactory(queueConnectionFactory()); | |
cachingConnectionFactory.setSessionCacheSize(10); | |
return cachingConnectionFactory; | |
} | |
@Bean(name = "cachingTopicContainerFactory") | |
public CachingConnectionFactory cachingTopicConnectionFactory() { | |
CachingConnectionFactory cachingConnectionFactory = | |
new CachingConnectionFactory(topicConnectionFactory()); | |
cachingConnectionFactory.setSessionCacheSize(10); | |
return cachingConnectionFactory; | |
} | |
//TODO: listener/consumer için. Ayrı bir config sınıfına alınabilir. | |
@Bean(name = "queueContainerFactory") | |
public DefaultJmsListenerContainerFactory queueJmsListenerContainerFactory( | |
@Qualifier("queueConnectionFactory") ConnectionFactory queueConnectionFactory, | |
DefaultJmsListenerContainerFactoryConfigurer configurer) { | |
DefaultJmsListenerContainerFactory queueListenerFactory = new DefaultJmsListenerContainerFactory(); | |
configurer.configure(queueListenerFactory, queueConnectionFactory); | |
queueListenerFactory.setConnectionFactory(queueConnectionFactory); | |
queueListenerFactory.setPubSubDomain(false); | |
queueListenerFactory.setMessageConverter(jacksonJmsMessageConverter()); | |
queueListenerFactory.setAutoStartup(true); | |
queueListenerFactory.setDestinationResolver(destinationResolver()); | |
queueListenerFactory.setConcurrency("1"); | |
queueListenerFactory.setErrorHandler( | |
new ErrorHandler() { | |
@Override | |
public void handleError(Throwable t) { | |
System.err.println("An error has occurred in the transaction"); | |
t.printStackTrace(); | |
} | |
} | |
); | |
return queueListenerFactory; | |
} | |
//TODO: listener/consumer için. Ayrı bir config sınıfına alınabilir. | |
@Bean(name = "topicContainerFactory") | |
public DefaultJmsListenerContainerFactory topicJmsListenerContainerFactory() { | |
// @Qualifier("topicConnectionFactory") ConnectionFactory topicConnectionFactory, | |
// DefaultJmsListenerContainerFactoryConfigurer configurer) { | |
DefaultJmsListenerContainerFactory topicListenerFactory = new DefaultJmsListenerContainerFactory(); | |
//configurer.configure(topicListenerFactory, topicConnectionFactory); | |
// configurer.configure(topicListenerFactory, cachingConnectionFactory()); | |
//topicListenerFactory.setConnectionFactory(topicConnectionFactory); | |
topicListenerFactory.setConnectionFactory(cachingTopicConnectionFactory()); | |
//TODO:Used by JmsTemplate for resolving destination names from simple Strings to actual Destination implementation instances. | |
topicListenerFactory.setDestinationResolver(destinationResolver()); | |
topicListenerFactory.setPubSubDomain(true); | |
topicListenerFactory.setMessageConverter(jacksonJmsMessageConverter()); | |
topicListenerFactory.setAutoStartup(true); | |
topicListenerFactory.setConcurrency("1"); | |
topicListenerFactory.setReceiveTimeout(10000l); | |
topicListenerFactory.setErrorHandler( | |
new ErrorHandler() { | |
@Override | |
public void handleError(Throwable t) { | |
System.err.println("An error has occurred in the transaction"); | |
t.printStackTrace(); | |
} | |
} | |
); | |
//factory.setErrorHandler(t -> System.err.println("An error has occurred in the transaction")); | |
return topicListenerFactory; | |
} | |
@Override | |
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { | |
registrar.setContainerFactory(topicJmsListenerContainerFactory()); | |
} | |
//TODO: publisher/producer için. Ay rı bir config sınıfına alınabilir. | |
@Bean(name = "topicJmsTemplate") | |
public JmsTemplate topicJmsTemplate(@Qualifier("topicConnectionFactory") ConnectionFactory topicConnectionFactory) { | |
JmsTemplate topicJmsTemplate = new JmsTemplate(); | |
topicJmsTemplate.setConnectionFactory(cachingTopicConnectionFactory()); | |
topicJmsTemplate.setReceiveTimeout(10000); | |
topicJmsTemplate.setMessageConverter(jacksonJmsMessageConverter()); | |
topicJmsTemplate.setDestinationResolver(destinationResolver()); | |
/*topicJmsTemplate.setDestinationResolver(jndiDestinationResolver());*/ | |
return topicJmsTemplate; | |
} | |
@Bean | |
public DestinationResolver destinationResolver() { | |
return new DynamicDestinationResolver(); | |
} | |
@Bean // Serialize message content to json using TextMessage | |
public MessageConverter jacksonJmsMessageConverter() { | |
/* | |
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); | |
converter.setTargetType(MessageType.TEXT); | |
converter.setTypeIdPropertyName("_type"); | |
converter.setObjectMapper(objectMapper()); | |
*/ | |
JsonMessageConverter converter = new JsonMessageConverter(); | |
return converter; | |
} | |
@Bean | |
public ObjectMapper objectMapper(){ | |
ObjectMapper mapper = new ObjectMapper(); | |
mapper.registerModule(new JavaTimeModule()); | |
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); | |
return mapper; | |
} | |
@Component | |
private class JsonMessageConverter implements MessageConverter { | |
@Autowired | |
private ObjectMapper mapper; | |
JsonMessageConverter() {} | |
/** | |
* Converts message to JSON. Used mostly by {@link org.springframework.jms.core.JmsTemplate} | |
*/ | |
@Override | |
public javax.jms.Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { | |
String json; | |
try { | |
json = mapper.writeValueAsString(object); | |
} catch (Exception e) { | |
throw new MessageConversionException("Message cannot be parsed. ", e); | |
} | |
TextMessage message = session.createTextMessage(); | |
message.setText(json); | |
return message; | |
} | |
/** | |
* Extracts JSON payload for further processing by JacksonMapper. | |
*/ | |
@Override | |
public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException { | |
return ((TextMessage) message).getText(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment