Skip to content

Instantly share code, notes, and snippets.

@tibor-kocsis
Last active January 29, 2018 12:39
Show Gist options
  • Save tibor-kocsis/a75d5a8cc8dd8d9a5a2f494f5bf48865 to your computer and use it in GitHub Desktop.
Save tibor-kocsis/a75d5a8cc8dd8d9a5a2f494f5bf48865 to your computer and use it in GitHub Desktop.
Simple reproducer for vertx-rabbitmq message receiving order bug
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.rabbitmq.RabbitMQClient;
public class P02_RabbitMQ_BasicQueueSend {
static JsonObject config = new JsonObject()
.put("user", "guest")
.put("password", "guest")
.put("host", "localhost")
.put("port", 5672)
.put("virtualHost", "/")
.put("connectionTimeout", 60000) // in milliseconds
.put("requestedHeartbeat", 60) // in seconds
.put("handshakeTimeout", 60000) // in milliseconds
.put("requestedChannelMax", 5)
.put("networkRecoveryInterval", 5000) // in milliseconds
.put("automaticRecoveryEnabled", true);
static String QUEUE_NAME = "testchannel";
public static class Sender {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
RabbitMQClient client = RabbitMQClient.create(vertx, config);
client.start(connectResult -> {
System.out.println("Client connected: " + connectResult.succeeded());
AtomicInteger i = new AtomicInteger(0);
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Handler<AsyncResult<JsonObject>> resultHandler
client.queueDeclare(QUEUE_NAME, true, false, false, declareResult -> {
System.out.println("Queue declared: " + declareResult.succeeded() + ", start sending");
vertx.setPeriodic(1000, timerId -> {
JsonObject message = new JsonObject()
.put("properties", new JsonObject().put("contentType", "application/json"))
.put("body", new JsonObject().put("counter", i.getAndIncrement()));
client.basicPublish("", QUEUE_NAME, message, sendingResult -> {
if (sendingResult.cause() != null) {
sendingResult.cause().printStackTrace();
}
});
});
});
});
}
}
public static class Receiver {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
RabbitMQClient client = RabbitMQClient.create(vertx, config);
client.start(connectResult -> {
System.out.println(Thread.currentThread() + " Client connected: " + connectResult.succeeded());
// String queue, boolean durable, boolean exclusive, boolean
// autoDelete, Handler<AsyncResult<JsonObject>> resultHandler
client.queueDeclare(QUEUE_NAME, true, false, false, declareResult -> {
System.out.println(Thread.currentThread() + " Queue declared: " + declareResult.succeeded() + ", start receiving");
vertx.eventBus().consumer("eventbus.channel", msg -> {
JsonObject json = (JsonObject) msg.body();
System.out.println(Thread.currentThread() + " Got message: " + json.encode() + " " + json.getJsonObject("body").toString());
});
client.basicConsume(QUEUE_NAME, "eventbus.channel", consumeResult -> {
if (consumeResult.succeeded()) {
System.out.println(Thread.currentThread() + " RabbitMQ consumer created!");
} else {
consumeResult.cause().printStackTrace();
}
});
});
});
}
}
public static class ReceiverNative {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(config.getString("host"));
factory.setPort(config.getInteger("port"));
factory.setUsername(config.getString("user"));
factory.setPassword(config.getString("password"));
factory.setVirtualHost(config.getString("virtualHost"));
Connection connection = factory.newConnection();
Channel rabbitMQChannel = connection.createChannel();
String queue = rabbitMQChannel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>()).getQueue();
Consumer consumer = new DefaultConsumer(rabbitMQChannel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "UTF-8");
System.out.println(message);
}
};
rabbitMQChannel.basicConsume(queue, true, consumer);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment