Skip to content

Instantly share code, notes, and snippets.

@dangnhdev
Created April 28, 2016 18:09
Show Gist options
  • Save dangnhdev/c3c3bb88677f755e95b3fee80f5b536a to your computer and use it in GitHub Desktop.
Save dangnhdev/c3c3bb88677f755e95b3fee80f5b536a to your computer and use it in GitHub Desktop.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Subcriber {
private static final String EXCHANGE_NAME = "topic_test";
private static final String queueName = "Foo Queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//create a durable, non-autodelete, non-exclusive queue
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "foo.bar");
System.out.println(" [*] Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
private int i = 0;
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(" [x] Received " + (++i));
//explicit ack
channel.basicAck(envelope.getDeliveryTag(), false);
//gracefully exit (not kill -9) if received 10 message
if (i%10 == 0) {
System.exit(0);
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignore) {
}
}
};
channel.basicConsume(queueName, false /*turn-off auto-ack*/, consumer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment