Created
January 28, 2020 17:25
-
-
Save monodot/826787a58e9b39afc5bd9dcfab3ada12 to your computer and use it in GitHub Desktop.
Fuse/Camel - Wiring up AMQ (ActiveMQ) and Oracle AQ with JTA XA transactions (Narayana) on Spring Boot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package xyz.tomd.demos.fuse.springboot.amqxa; | |
import oracle.jdbc.xa.client.OracleXADataSource; | |
import oracle.jms.AQjmsFactory; | |
import org.apache.camel.component.jms.JmsComponent; | |
import org.messaginghub.pooled.jms.JmsPoolXAConnectionFactory; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.boot.jta.XAConnectionFactoryWrapper; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.jms.connection.JmsTransactionManager; | |
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; | |
import org.springframework.transaction.PlatformTransactionManager; | |
import javax.jms.*; | |
import javax.transaction.TransactionManager; | |
import java.sql.SQLException; | |
import java.util.Properties; | |
@SpringBootApplication | |
public class Application { | |
/** | |
* A main method to start this application. | |
*/ | |
public static void main(String[] args) { | |
SpringApplication.run(Application.class, args); | |
} | |
@Bean(name = "jms") | |
JmsComponent jmsComponent(ConnectionFactory activemqCF, PlatformTransactionManager jmstx) { | |
JmsComponent jms = new JmsComponent(); | |
// This should be our ActiveMQ XA connection factory | |
jms.setConnectionFactory(activemqCF); | |
jms.setCacheLevelName("CACHE_CONSUMER"); | |
// Camel uses Spring JMS under the covers - so it expects a (Spring) PlatformTransactionManager | |
jms.setTransactionManager(jmstx); | |
jms.setTransacted(true); | |
return jms; | |
} | |
@Bean(name = "oracleaq") | |
JmsComponent oracleAQJmsComponent(PlatformTransactionManager transactionManager, | |
TransactionManager jtaTransactionManager) | |
throws JMSException, SQLException { | |
OracleXADataSource oracleXADataSource = new OracleXADataSource(); | |
oracleXADataSource.setURL("jdbc:oracle:thin:@localhost:1521:ORCLCDB"); | |
oracleXADataSource.setUser("scott"); | |
oracleXADataSource.setPassword("tiger"); | |
// Now we've created the XA datasource, we need something that will generate an XAConnectionFactory | |
XAConnectionFactory oracleXACF = AQjmsFactory.getXAConnectionFactory(oracleXADataSource); | |
// Now we need to wrap this connection factory in an enlisting connection factory | |
// Related reading: https://access.redhat.com/documentation/en-us/red_hat_fuse/7.2/html-single/apache_karaf_transaction_guide/index#about_auto_enlistment | |
JmsPoolXAConnectionFactory pooledJmsXACF = new JmsPoolXAConnectionFactory(); | |
pooledJmsXACF.setConnectionFactory(oracleXACF); | |
// Wire the connection factory to Narayana via its JTA interface implementation | |
pooledJmsXACF.setTransactionManager(jtaTransactionManager); | |
JmsComponent jms = new JmsComponent(); | |
jms.setConnectionFactory(pooledJmsXACF); | |
// Wire the Camel JMS component to Narayana via its Spring interface implementation | |
jms.setTransactionManager(transactionManager); | |
jms.setTransacted(false); | |
return jms; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# By simply setting these values, Spring Boot will create an ActiveMQ connection factory | |
# We will then attach this connection factory to a Camel JmsComponent (see Application.java) | |
spring.activemq.broker-url=tcp://localhost:61616 | |
spring.activemq.user=admin | |
spring.activemq.password=admin | |
# More autoconfiguration magic.... Spring Boot will create a datasource for us with these properties | |
spring.datasource.url=jdbc:postgresql://localhost:5432/sampledb | |
spring.datasource.username=admin | |
spring.datasource.password=admin | |
#debug=true | |
logging.level.org.apache.camel.component.jms=DEBUG | |
# Cluster (options replaced by Openshift env variables) | |
[email protected]@ | |
cluster.nodename=${cluster.name}-0 | |
cluster.base-dir=./target/tx | |
# Recovery is enabled only inside Openshift | |
cluster.recovery.enabled=false | |
# Transaction data | |
spring.jta.transaction-manager-id=${cluster.nodename} | |
spring.jta.log-dir=${cluster.base-dir}/store/${cluster.nodename} | |
# Transaction recovery settings | |
snowdrop.narayana.openshift.recovery.enabled=${cluster.recovery.enabled} | |
snowdrop.narayana.openshift.recovery.current-pod-name=${cluster.nodename} | |
snowdrop.narayana.openshift.recovery.statefulset=${cluster.name} | |
snowdrop.narayana.openshift.recovery.status-dir=${cluster.base-dir}/status |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package xyz.tomd.demos.fuse.springboot.amqxa; | |
import org.apache.camel.builder.RouteBuilder; | |
import org.springframework.stereotype.Component; | |
@Component | |
public class XaApplicationRouteBuilder extends RouteBuilder { | |
public void configure() throws Exception { | |
from("oracleaq:FOOQUEUE") | |
.log("Received a message from Oracle AQ! - ${body}") | |
.setHeader("name", body()) | |
.to("sql:insert into chickens (name) values (:#name)") | |
.log("Inserted a chicken into the database") | |
.log("Sending to ActiveMQ...") | |
.to("jms:queue:CHICKENS.PROCESSED"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment