Skip to content

Instantly share code, notes, and snippets.

@unixc3t
Created October 6, 2017 05:39
Show Gist options
  • Select an option

  • Save unixc3t/8494a90f643ec430912af6afb1816919 to your computer and use it in GitHub Desktop.

Select an option

Save unixc3t/8494a90f643ec430912af6afb1816919 to your computer and use it in GitHub Desktop.
Message router
package mr;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
/**
* rudy
* 1:00 PM
* 10/6/17.
*/
public class MessageRouterReceiver {
private static final String TOPIC_EXCHANGE = "topic_exchange";
private final static Logger LOGGER = LoggerFactory.getLogger(MessageRouterReceiver.class);
private Channel channel;
private Connection connection;
public void initialize() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void receive(String queue, String routeKey) {
if (channel == null) {
initialize();
}
try {
channel.queueDeclare(queue, false, false, false, null);
channel.exchangeDeclare(TOPIC_EXCHANGE, "topic");
channel.queueBind(queue, TOPIC_EXCHANGE, routeKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
String message = new String(body, "UTF-8");
System.out.println(" routeKey: ["+routeKey+"] Received '" + message + "'");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(queue, true, consumer);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}
package mr;
/**
* rudy
* 1:21 PM
* 10/6/17.
*/
public class TopicReceiverDemo {
public static void receiveMessage() {
MessageRouterReceiver mrr_seminar = new MessageRouterReceiver();
MessageRouterReceiver mrr_hackation = new MessageRouterReceiver();
mrr_seminar.initialize();
mrr_hackation.initialize();
mrr_seminar.receive("seminar_queue","seminar.#");
mrr_hackation.receive("hackaton_queue","hackation.#");
}
public static void main(String[] args) {
receiveMessage();
}
}
package mr;
import ptop.Sender;
/**
* rudy
* 1:22 PM
* 10/6/17.
*/
public class TopicSenderDemo {
private static final String TOPIC_EXCHANGE = "topic_exchange";
public static void sendToTopicExchange() {
Sender sender = new Sender();
sender.initialize();
sender.sendEvent(TOPIC_EXCHANGE, "Test message 1.", "seminar.java");
sender.sendEvent(TOPIC_EXCHANGE, "Test message 2.", "seminar.rabbitmq");
sender.sendEvent(TOPIC_EXCHANGE, "Test message 3.", "hackaton.rabbitmq");
sender.destroy();
}
public static void main(String[] args) {
sendToTopicExchange();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment