Created
December 6, 2016 06:11
-
-
Save clevertension/28f1af6bf28a7c474efc1bacf15d0147 to your computer and use it in GitHub Desktop.
Autoreconnect rabbitmq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.*; | |
import com.rabbitmq.utility.Utility; | |
import java.io.IOException; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* Created by dan on 2016/10/19. | |
*/ | |
public class BlockableQueueConsumer extends DefaultConsumer { | |
private final BlockingQueue<QueueingConsumer.Delivery> _queue; | |
// When this is non-null the queue is in shutdown mode and nextDelivery should | |
// throw a shutdown signal exception. | |
private volatile ShutdownSignalException _shutdown; | |
private volatile ConsumerCancelledException _cancelled; | |
// Marker object used to signal the queue is in shutdown mode. | |
// It is only there to wake up consumers. The canonical representation | |
// of shutting down is the presence of _shutdown. | |
// Invariant: This is never on _queue unless _shutdown != null. | |
private static final QueueingConsumer.Delivery POISON = new QueueingConsumer.Delivery(null, null, null); | |
public BlockableQueueConsumer(Channel ch) { | |
this(ch, new LinkedBlockingQueue<QueueingConsumer.Delivery>()); | |
} | |
public BlockableQueueConsumer(Channel ch, BlockingQueue<QueueingConsumer.Delivery> q) { | |
super(ch); | |
this._queue = q; | |
} | |
@Override public void handleShutdownSignal(String consumerTag, | |
ShutdownSignalException sig) { | |
_shutdown = sig; | |
} | |
@Override public void handleCancel(String consumerTag) throws IOException { | |
_cancelled = new ConsumerCancelledException(); | |
_queue.add(POISON); | |
} | |
@Override public void handleDelivery(String consumerTag, | |
Envelope envelope, | |
AMQP.BasicProperties properties, | |
byte[] body) | |
throws IOException | |
{ | |
if (this.getChannel().isOpen()) { | |
this._queue.add(new QueueingConsumer.Delivery(envelope, properties, body)); | |
} | |
} | |
/** | |
* If delivery is not POISON nor null, return it. | |
* <p/> | |
* If delivery, _shutdown and _cancelled are all null, return null. | |
* <p/> | |
* If delivery is POISON re-insert POISON into the queue and | |
* throw an exception if POISONed for no reason. | |
* <p/> | |
* Otherwise, if we are in shutdown mode or cancelled, | |
* throw a corresponding exception. | |
*/ | |
private QueueingConsumer.Delivery handle(QueueingConsumer.Delivery delivery) { | |
if (delivery == POISON || | |
delivery == null && (_shutdown != null || _cancelled != null)) { | |
if (delivery == POISON) { | |
_queue.add(POISON); | |
if (_shutdown == null && _cancelled == null) { | |
throw new IllegalStateException( | |
"POISON in queue, but null _shutdown and null _cancelled. " + | |
"This should never happen, please report as a BUG"); | |
} | |
} | |
if (null != _shutdown) { | |
throw Utility.fixStackTrace(_shutdown); | |
} | |
if (null != _cancelled) | |
throw Utility.fixStackTrace(_cancelled); | |
} | |
return delivery; | |
} | |
/** | |
* Main application-side API: wait for the next message delivery and return it. | |
* @return the next message | |
* @throws InterruptedException if an interrupt is received while waiting | |
* @throws ShutdownSignalException if the connection is shut down while waiting | |
* @throws ConsumerCancelledException if this consumer is cancelled while waiting | |
*/ | |
public QueueingConsumer.Delivery nextDelivery() | |
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException | |
{ | |
return handle(_queue.take()); | |
} | |
/** | |
* Main application-side API: wait for the next message delivery and return it. | |
* @param timeout timeout in millisecond | |
* @return the next message or null if timed out | |
* @throws InterruptedException if an interrupt is received while waiting | |
* @throws ShutdownSignalException if the connection is shut down while waiting | |
* @throws ConsumerCancelledException if this consumer is cancelled while waiting | |
*/ | |
public QueueingConsumer.Delivery nextDelivery(long timeout) | |
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException | |
{ | |
return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.*; | |
import java.util.*; | |
/** | |
* 采用 lyra的组件, 实现了auto reconnect | |
* 其实不用任何组件,只要修改 QueuingConsumer这个类, 就可以实现 auto reconnect | |
*/ | |
public class Consumer { | |
private static final String QUEUE_NAME = "10bei-test-local4"; | |
private static Address[] getAddresses(String host) { | |
String[] strs = host.split(","); | |
List<Address> addresses = new ArrayList<>(); | |
for(String str: strs) { | |
addresses.add(new Address(str.trim())); | |
} | |
return addresses.toArray(new Address[0]); | |
} | |
public static void main(String[] args) throws Exception { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setUsername("10bei"); | |
factory.setPassword("10bei.cn"); | |
factory.setVirtualHost("/10bei"); | |
factory.setRequestedHeartbeat(5); | |
factory.setConnectionTimeout(5000); | |
factory.setAutomaticRecoveryEnabled(true); | |
Connection connection = factory.newConnection(getAddresses("192.168.1.4")); | |
final Channel channel = connection.createChannel(); | |
String queueName = QUEUE_NAME; | |
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建 | |
channel.queueDeclare(queueName, true, false, true, null); | |
Producer.XT xt = Producer.XT.HEADERS; | |
switch (xt) { | |
case FANOUT: | |
//接收端也声明一个fanout交换机 | |
channel.exchangeDeclare(Producer.XCHG_NAME, "fanout", true, true, null); | |
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange | |
//声明一个临时队列,该队列会在使用完比后自动销毁 | |
//将队列绑定到交换机,参数3无意义此时 | |
channel.queueBind(queueName, Producer.XCHG_NAME, ""); | |
break; | |
case DIRECT: | |
channel.exchangeDeclare(Producer.XCHG_NAME, "direct", true, true, null); | |
channel.queueBind(queueName, Producer.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个 | |
channel.queueBind(queueName, Producer.XCHG_NAME, "warning"); | |
break; | |
case TOPIC: | |
channel.exchangeDeclare(Producer.XCHG_NAME, "topic", true, true, null); | |
channel.queueBind(queueName, Producer.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词 | |
channel.queueBind(queueName, Producer.XCHG_NAME, "*.blue"); | |
break; | |
case HEADERS: | |
channel.exchangeDeclare(Producer.XCHG_NAME, "headers", true, false, null); | |
Map<String, Object> headers = new HashMap<String, Object>() {{ | |
put("name", "test"); | |
put("sex", "male"); | |
put("x-match", "any");//all==匹配所有条件,any==匹配任意条件 | |
}}; | |
channel.queueBind(queueName, Producer.XCHG_NAME, Producer.ROUTING_KEY, headers); | |
break; | |
} | |
channel.basicQos(1); //server push消息时的队列长度 | |
BlockableQueueConsumer consumer = new BlockableQueueConsumer(channel); | |
// 指定接收者,第二个参数为自动应答,无需手动应答 | |
channel.basicConsume(QUEUE_NAME, false, consumer); | |
while (true) { | |
//消息采用阻塞的方式 | |
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); | |
String message = new String(delivery.getBody()); | |
System.out.println(message); | |
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment