Skip to content

Instantly share code, notes, and snippets.

@dangnhdev
Last active April 28, 2016 17:53
Show Gist options
  • Save dangnhdev/85af62c1c27920c5173cc09895ab42c1 to your computer and use it in GitHub Desktop.
Save dangnhdev/85af62c1c27920c5173cc09895ab42c1 to your computer and use it in GitHub Desktop.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Subscriber {
private static final String EXCHANGE_NAME = "topic_test";
private static final String queueName = "Foo Queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//create a durable, non-autodelete, non-exclusive queue
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "foo.bar");
System.out.println(" [*] Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
private int i = 0;
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(" [x] Received " + (++i));
//gracefully exit (not kill -9) if received 10 message
if (i%10 == 0) {
System.exit(0);
}
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ignore) {
}
}
};
channel.basicConsume(queueName, true /*auto-ack*/, consumer);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment