Skip to content

Instantly share code, notes, and snippets.

@unixc3t
Last active October 6, 2017 05:36
Show Gist options
  • Save unixc3t/bc3ac2045d2576a52fbdd3119027c814 to your computer and use it in GitHub Desktop.
Save unixc3t/bc3ac2045d2576a52fbdd3119027c814 to your computer and use it in GitHub Desktop.
RabbitMQ Point-to-point communication
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* rudy
* 11:43 PM
* 10/4/17.
*/
public class CompetingReceiver {
private final static String QUEUE_NAME = "event_queue";
private final static Logger LOGGER = LoggerFactory.getLogger(CompetingReceiver.class);
private Channel channel = null;
Connection connection = null;
private String message = null;
public void initialize() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public String receive() {
if (channel == null) {
initialize();
}
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
message = new String(body, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
return message;
}
public void destroy() {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOGGER.warn(e.getMessage(), e);
}
}
}
}
/**
* rudy
* 12:26 PM
* 10/5/17.
*/
public class CompetingReceiverDemo {
public static void main(String[] args) throws InterruptedException {
final CompetingReceiver receiver1 = new CompetingReceiver();
receiver1.initialize();
final CompetingReceiver receiver2 = new CompetingReceiver();
receiver2.initialize();
Thread t1 = new Thread(new Runnable() {
public void run() {
receiver1.receive();
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
receiver2.receive();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
receiver1.destroy();
receiver2.destroy();
}
}
/**
* rudy
* 11:55 AM
* 10/5/17.
*/
public class DefaultExchangeSenderDemo {
public static void sendToDefaultExchange() {
Sender sender = new Sender();
sender.initialize();
sender.send("Test message.");
sender.destroy();
}
public static void main(String[] args) {
sendToDefaultExchange();
}
}
package ptop;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
/**
* rudy
* 10:31 PM
* 10/4/17.
*/
public class Sender {
private static final String SEMINAR_QUEUE = "seminar_queue";
private static final String HACKATON_QUEUE = "hackaton_queue";
private static final String TOPIC_EXCHANGE = "topic_exchange";
private static final String REQUEST_QUEUE = "request_queue";
private static final String RESPONSE_QUEUE = "response_queue";
private final static String QUEUE_NAME = "event_queue";
private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class);
private static final String DEFAULT_EXCHANGE = "";
private Channel channel;
private Connection connection;
public void initialize() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void send(String message) {
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish(DEFAULT_EXCHANGE, QUEUE_NAME, null, message.getBytes());
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
public void send(String exchange, String type, String message) {
try {
channel.exchangeDeclare(exchange, type);
channel.basicPublish(exchange, "", null, message.getBytes());
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
public void sendRequest(String requestQueue, String message, String correlationId) {
try {
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
AMQP.BasicProperties amqpProps = new AMQP.BasicProperties();
amqpProps = amqpProps.builder().correlationId(String.valueOf(correlationId))
.replyTo(RESPONSE_QUEUE).build();
channel.basicPublish(DEFAULT_EXCHANGE, REQUEST_QUEUE, amqpProps, message.getBytes());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void waitForResponse(final String correlationId) {
final Sender stemp = this;
try {
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
if (properties != null) {
String msgCorrelationId = properties.getCorrelationId();
if (!correlationId.equals(msgCorrelationId)) {
LOGGER.warn("Received response of another request.");
}
}
if (message != null) {
LOGGER.info("Message received: " + message);
stemp.destroy();
}
}
};
channel.basicConsume(RESPONSE_QUEUE, true, consumer);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void sendEvent(String exchange, String message, String messageKey) {
try {
channel.exchangeDeclare(TOPIC_EXCHANGE, "topic");
channel.queueDeclare(SEMINAR_QUEUE, false, false, false, null);
channel.queueDeclare(HACKATON_QUEUE, false, false, false, null);
channel.queueBind(SEMINAR_QUEUE, TOPIC_EXCHANGE, "seminar.#");
channel.queueBind(HACKATON_QUEUE, TOPIC_EXCHANGE, "hackaton.#");
channel.basicPublish(TOPIC_EXCHANGE, messageKey, null, message.getBytes());
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
public void destroy() {
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
LOGGER.warn(e.getMessage(), e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment