Last active
June 28, 2021 01:37
-
-
Save garyrussell/6594677 to your computer and use it in GitHub Desktop.
Demonstration of Synchronous Transactions with Spring-AMQP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2013 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package foo; | |
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; | |
import org.springframework.amqp.rabbit.connection.ConnectionFactory; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.transaction.annotation.EnableTransactionManagement; | |
/** | |
* @author Gary Russell | |
* | |
*/ | |
@Configuration | |
@EnableTransactionManagement | |
public class Config { | |
@Bean | |
public RabbitTemplate rabbitTemplate() { | |
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); | |
rabbitTemplate.setRoutingKey("txTestQ2"); | |
rabbitTemplate.setQueue("txTestQ1"); | |
rabbitTemplate.setMandatory(true); | |
rabbitTemplate.setChannelTransacted(true); | |
return rabbitTemplate; | |
} | |
@Bean | |
public ConnectionFactory connectionFactory() { | |
return new CachingConnectionFactory(); | |
} | |
@Bean | |
public RabbitTransactionManager transactionManager() { | |
return new RabbitTransactionManager(connectionFactory()); | |
} | |
@Bean | |
public Service service() { | |
// return new ServiceImpl1(); | |
return new ServiceImpl2(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2013 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package foo; | |
/** | |
* @author Gary Russell | |
* | |
*/ | |
public interface Service { | |
void process(boolean crash) throws Exception; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2013 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package foo; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.transaction.annotation.Transactional; | |
/** | |
* @author Gary Russell | |
* | |
*/ | |
public class ServiceImpl1 implements Service { | |
@Autowired | |
private RabbitTemplate template; | |
@Transactional | |
public void process(boolean crash) { | |
Object o = template.receiveAndConvert(); | |
if (crash) { | |
throw new RuntimeException("crash"); | |
} | |
if (o != null) { | |
System.out.println(o); | |
template.convertAndSend(o); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2013 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package foo; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.transaction.annotation.Transactional; | |
/** | |
* @author Gary Russell | |
* | |
*/ | |
public class ServiceImpl2 implements Service { | |
@Autowired | |
private RabbitTemplate template; | |
@Override | |
@Transactional(rollbackFor=Exception.class) | |
public void process(boolean crash) throws Exception { | |
Object o = template.receiveAndConvert(); | |
if (crash) { | |
throw new FooEx("crash"); | |
} | |
if (o != null) { | |
System.out.println(o); | |
template.convertAndSend(o); | |
} | |
} | |
@SuppressWarnings("serial") | |
private class FooEx extends Exception { | |
private FooEx(String message) { | |
super(message); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2013 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package foo; | |
import static org.junit.Assert.assertNotNull; | |
import static org.junit.Assert.assertNull; | |
import org.junit.Test; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext; | |
import org.springframework.context.support.AbstractApplicationContext; | |
/** | |
* @author Gary Russell | |
* | |
*/ | |
public class TxTest { | |
@Test | |
public void test() { | |
final AbstractApplicationContext context = | |
new AnnotationConfigApplicationContext(Config.class); | |
RabbitTemplate template = context.getBean(RabbitTemplate.class); | |
template.convertAndSend("", "txTestQ1", "foo"); | |
Service service = context.getBean(Service.class); | |
service.process(false); | |
Object o = template.receiveAndConvert("txTestQ1"); | |
assertNull(o); | |
o = template.receiveAndConvert("txTestQ2"); | |
assertNotNull(o); | |
System.out.println("message " + o + " moved from Q1 to Q2"); | |
template.convertAndSend("", "txTestQ1", "bar"); | |
try { | |
service.process(true); | |
} | |
catch (Exception e) { | |
System.out.println(e.getMessage()); | |
} | |
o = template.receiveAndConvert("txTestQ1"); | |
assertNotNull(o); | |
System.out.println("message " + o + " still in Q1"); | |
o = template.receiveAndConvert("txTestQ2"); | |
assertNull(o); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
09:55:17.979 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Creating new transaction with name [foo.Service.process]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; '' | |
09:55:17.979 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel | |
09:55:17.979 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Created AMQP transaction on channel [Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)] | |
09:55:17.980 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] to thread [main] | |
09:55:17.980 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Getting transaction for [foo.Service.process] | |
09:55:17.994 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
09:55:17.994 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:17.997 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
09:55:17.997 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
foo | |
09:55:17.998 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
09:55:17.998 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:17.998 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Publishing message on exchange [], routingKey = [txTestQ2] | |
09:55:17.999 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
09:55:17.999 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Completing transaction for [foo.Service.process] | |
09:55:17.999 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Initiating transaction commit | |
09:55:18.123 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] from thread [main] | |
09:55:18.123 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.123 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel | |
09:55:18.124 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.124 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.125 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel | |
09:55:18.125 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.126 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
message foo moved from Q1 to Q2 | |
09:55:18.126 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel | |
09:55:18.126 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.127 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Publishing message on exchange [], routingKey = [txTestQ1] | |
09:55:18.242 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.242 DEBUG [main][org.springframework.beans.factory.support.DefaultListableBeanFactory] Returning cached instance of singleton bean 'transactionManager' | |
09:55:18.243 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Creating new transaction with name [foo.Service.process]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; '' | |
09:55:18.243 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel | |
09:55:18.243 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Created AMQP transaction on channel [Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)] | |
09:55:18.243 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] to thread [main] | |
09:55:18.243 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Getting transaction for [foo.Service.process] | |
09:55:18.243 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
09:55:18.243 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.244 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
09:55:18.244 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main] | |
09:55:18.244 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Completing transaction for [foo.Service.process] after exception: java.lang.RuntimeException: crash | |
09:55:18.244 TRACE [main][org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Applying rules to determine whether transaction should rollback on java.lang.RuntimeException: crash | |
09:55:18.245 TRACE [main][org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Winning rollback rule is: null | |
09:55:18.245 TRACE [main][org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] No relevant rollback rule found: applying default rules | |
09:55:18.245 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Initiating transaction rollback | |
09:55:18.245 DEBUG [main][org.springframework.amqp.rabbit.connection.RabbitResourceHolder] Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.247 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] from thread [main] | |
09:55:18.247 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
crash | |
09:55:18.247 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel | |
09:55:18.247 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
09:55:18.248 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://[email protected]:5672/,1) | |
message bar still in Q1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment