-
-
Save felipeg48/8028757 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2002-2010 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.shopzilla.amqp.core; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ShutdownListener; | |
import com.rabbitmq.client.ShutdownSignalException; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; | |
import org.springframework.beans.factory.annotation.Required; | |
import java.io.IOException; | |
/** | |
* @author Mark Lui | |
* @since Jan 5, 2011 | |
*/ | |
public class AutoRetryConnectionFactory extends SingleConnectionFactory { | |
private static final Log LOG = LogFactory.getLog(AutoRetryConnectionFactory.class); | |
private long retryDelay; | |
private AutoRetryMessageListenerContainer container; | |
public AutoRetryConnectionFactory(String hostName) { | |
super(hostName); | |
} | |
@Override | |
protected void prepareConnection(Connection con) throws IOException { | |
//Add a listener whenever the connection to RabbitMQ breaks | |
con.addShutdownListener(new AutoRetryShutdownListener(container, retryDelay, this)); | |
} | |
static class AutoRetryShutdownListener implements ShutdownListener { | |
private long retryDelay; | |
private AutoRetryMessageListenerContainer container; | |
private SingleConnectionFactory connectionFactory; | |
public AutoRetryShutdownListener(AutoRetryMessageListenerContainer container, long retryDelay, | |
SingleConnectionFactory connectionFactory) { | |
this.container = container; | |
this.retryDelay = retryDelay; | |
this.connectionFactory = connectionFactory; | |
} | |
public void shutdownCompleted(ShutdownSignalException cause) { | |
//Need to check reason to determine if reconnection logic should run | |
String exceptionMessage = cause.getMessage(); | |
//If clean connection shutdown do not run reconnection code | |
if (exceptionMessage.indexOf("clean connection shutdown") < 0) { | |
boolean containerDown = true; | |
while (containerDown) { | |
try { | |
//Container must be shutdown to allow a restart | |
if (container != null) { | |
container.shutdown(); | |
container.start(); | |
} else { | |
connectionFactory.resetConnection(); | |
} | |
containerDown = false; | |
} catch (Throwable ex) { | |
LOG.warn(String.format("Problem connecting with RabbitMQ waiting %d ms", retryDelay)); | |
try { | |
Thread.sleep(retryDelay); | |
} catch (InterruptedException e) { | |
//do nothing | |
} | |
} | |
} | |
} | |
} | |
} | |
public void setContainer(AutoRetryMessageListenerContainer container) { | |
this.container = container; | |
} | |
@Required | |
public void setRetryDelay(long retryDelay) { | |
this.retryDelay = retryDelay; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2002-2010 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.shopzilla.amqp.core; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.amqp.rabbit.connection.ConnectionFactory; | |
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; | |
import org.springframework.amqp.rabbit.core.RabbitAdmin; | |
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; | |
import org.springframework.beans.factory.annotation.Required; | |
import org.springframework.util.ReflectionUtils; | |
import java.lang.reflect.Field; | |
import java.util.LinkedList; | |
import java.util.List; | |
/** | |
* @author Mark Lui | |
* @since Jan 5, 2011 | |
*/ | |
public class AutoRetryMessageListenerContainer extends SimpleMessageListenerContainer { | |
private static final Log LOG = LogFactory.getLog(AutoRetryMessageListenerContainer.class); | |
private long retryDelay; | |
private List<Queue> targetQueue; | |
private RabbitAdmin amqpAdmin; | |
public AutoRetryMessageListenerContainer() { | |
super(); | |
} | |
public AutoRetryMessageListenerContainer(AutoRetryConnectionFactory connectionFactory) { | |
super(connectionFactory); | |
connectionFactory.setContainer(this); | |
} | |
/** | |
* Capture exceptions from property set if RabbitMQ is down on startup. Will retry until reconnect is possible. | |
*/ | |
@Override | |
public void afterPropertiesSet() { | |
boolean started = false; | |
while (!started) { | |
try { | |
super.afterPropertiesSet(); | |
for(Queue queue: targetQueue) { | |
amqpAdmin.declareQueue(queue); | |
} | |
started = true; | |
} catch (Throwable ex) { | |
LOG.warn(String.format("Problem connecting with RabbitMQ waiting %d ms", retryDelay)); | |
try { | |
Thread.sleep(retryDelay); | |
} catch (InterruptedException e) { | |
//do nothing | |
} | |
} | |
} | |
} | |
/** | |
* Augmented shutdown code to fix issues with current RabbitMQ container. | |
* This may be unnecessary in later versions of Spring AMQP support after 1.0.0.M1 | |
*/ | |
@Override | |
public void shutdown() { | |
((SingleConnectionFactory)getConnectionFactory()).resetConnection(); | |
super.shutdown(); | |
try { | |
Field consumersField = ReflectionUtils.findField(this.getClass(), "consumers"); | |
ReflectionUtils.makeAccessible(consumersField); | |
consumersField.set(this, null); | |
Field channelsField = ReflectionUtils.findField(this.getClass(), "channels"); | |
ReflectionUtils.makeAccessible(channelsField); | |
channelsField.set(this, null); | |
} catch (IllegalAccessException ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
@Required | |
public void setRetryDelay(long retryDelay) { | |
this.retryDelay = retryDelay; | |
} | |
public void setTargetQueue(Queue queue) { | |
if(targetQueue == null) { | |
targetQueue = new LinkedList<Queue>(); | |
} | |
targetQueue.add(queue); | |
} | |
public void setTargetQueueList(List<Queue> queues) { | |
targetQueue = queues; | |
} | |
@Required | |
public void setAmqpAdmin(RabbitAdmin amqpAdmin) { | |
this.amqpAdmin = amqpAdmin; | |
} | |
/** | |
* Wrapper final method isRunning to allow for unit testing | |
* @return boolean is container is running | |
*/ | |
public boolean isEnabled() { | |
return super.isRunning(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment