Last active
October 6, 2017 05:36
-
-
Save unixc3t/bc3ac2045d2576a52fbdd3119027c814 to your computer and use it in GitHub Desktop.
RabbitMQ Point-to-point communication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.*; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.io.UnsupportedEncodingException; | |
/** | |
* rudy | |
* 11:43 PM | |
* 10/4/17. | |
*/ | |
public class CompetingReceiver { | |
private final static String QUEUE_NAME = "event_queue"; | |
private final static Logger LOGGER = LoggerFactory.getLogger(CompetingReceiver.class); | |
private Channel channel = null; | |
Connection connection = null; | |
private String message = null; | |
public void initialize() { | |
try { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("localhost"); | |
connection = factory.newConnection(); | |
channel = connection.createChannel(); | |
} catch (Exception e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
public String receive() { | |
if (channel == null) { | |
initialize(); | |
} | |
try { | |
channel.queueDeclare(QUEUE_NAME, false, false, false, null); | |
Consumer consumer = new DefaultConsumer(channel) { | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, | |
AMQP.BasicProperties properties, byte[] body) { | |
try { | |
message = new String(body, "UTF-8"); | |
} catch (UnsupportedEncodingException e) { | |
e.printStackTrace(); | |
} | |
System.out.println(" [x] Received '" + message + "'"); | |
} | |
}; | |
channel.basicConsume(QUEUE_NAME, true, consumer); | |
} catch (Exception e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
return message; | |
} | |
public void destroy() { | |
if (connection != null) { | |
try { | |
connection.close(); | |
} catch (IOException e) { | |
LOGGER.warn(e.getMessage(), e); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* rudy | |
* 12:26 PM | |
* 10/5/17. | |
*/ | |
public class CompetingReceiverDemo { | |
public static void main(String[] args) throws InterruptedException { | |
final CompetingReceiver receiver1 = new CompetingReceiver(); | |
receiver1.initialize(); | |
final CompetingReceiver receiver2 = new CompetingReceiver(); | |
receiver2.initialize(); | |
Thread t1 = new Thread(new Runnable() { | |
public void run() { | |
receiver1.receive(); | |
} | |
}); | |
Thread t2 = new Thread(new Runnable() { | |
public void run() { | |
receiver2.receive(); | |
} | |
}); | |
t1.start(); | |
t2.start(); | |
t1.join(); | |
t2.join(); | |
receiver1.destroy(); | |
receiver2.destroy(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* rudy | |
* 11:55 AM | |
* 10/5/17. | |
*/ | |
public class DefaultExchangeSenderDemo { | |
public static void sendToDefaultExchange() { | |
Sender sender = new Sender(); | |
sender.initialize(); | |
sender.send("Test message."); | |
sender.destroy(); | |
} | |
public static void main(String[] args) { | |
sendToDefaultExchange(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ptop; | |
import com.rabbitmq.client.*; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.util.concurrent.ExecutionException; | |
/** | |
* rudy | |
* 10:31 PM | |
* 10/4/17. | |
*/ | |
public class Sender { | |
private static final String SEMINAR_QUEUE = "seminar_queue"; | |
private static final String HACKATON_QUEUE = "hackaton_queue"; | |
private static final String TOPIC_EXCHANGE = "topic_exchange"; | |
private static final String REQUEST_QUEUE = "request_queue"; | |
private static final String RESPONSE_QUEUE = "response_queue"; | |
private final static String QUEUE_NAME = "event_queue"; | |
private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class); | |
private static final String DEFAULT_EXCHANGE = ""; | |
private Channel channel; | |
private Connection connection; | |
public void initialize() { | |
try { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("localhost"); | |
connection = factory.newConnection(); | |
channel = connection.createChannel(); | |
} catch (Exception e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
public void send(String message) { | |
try { | |
channel.queueDeclare(QUEUE_NAME, false, false, false, null); | |
channel.basicPublish(DEFAULT_EXCHANGE, QUEUE_NAME, null, message.getBytes()); | |
} catch (IOException e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
public void send(String exchange, String type, String message) { | |
try { | |
channel.exchangeDeclare(exchange, type); | |
channel.basicPublish(exchange, "", null, message.getBytes()); | |
} catch (IOException e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
public void sendRequest(String requestQueue, String message, String correlationId) { | |
try { | |
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null); | |
channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null); | |
AMQP.BasicProperties amqpProps = new AMQP.BasicProperties(); | |
amqpProps = amqpProps.builder().correlationId(String.valueOf(correlationId)) | |
.replyTo(RESPONSE_QUEUE).build(); | |
channel.basicPublish(DEFAULT_EXCHANGE, REQUEST_QUEUE, amqpProps, message.getBytes()); | |
} catch (Exception e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
public void waitForResponse(final String correlationId) { | |
final Sender stemp = this; | |
try { | |
Consumer consumer = new DefaultConsumer(channel) { | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | |
String message = new String(body, "UTF-8"); | |
if (properties != null) { | |
String msgCorrelationId = properties.getCorrelationId(); | |
if (!correlationId.equals(msgCorrelationId)) { | |
LOGGER.warn("Received response of another request."); | |
} | |
} | |
if (message != null) { | |
LOGGER.info("Message received: " + message); | |
stemp.destroy(); | |
} | |
} | |
}; | |
channel.basicConsume(RESPONSE_QUEUE, true, consumer); | |
} catch (Exception e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
public void sendEvent(String exchange, String message, String messageKey) { | |
try { | |
channel.exchangeDeclare(TOPIC_EXCHANGE, "topic"); | |
channel.queueDeclare(SEMINAR_QUEUE, false, false, false, null); | |
channel.queueDeclare(HACKATON_QUEUE, false, false, false, null); | |
channel.queueBind(SEMINAR_QUEUE, TOPIC_EXCHANGE, "seminar.#"); | |
channel.queueBind(HACKATON_QUEUE, TOPIC_EXCHANGE, "hackaton.#"); | |
channel.basicPublish(TOPIC_EXCHANGE, messageKey, null, message.getBytes()); | |
} catch (IOException e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
public void destroy() { | |
try { | |
if (connection != null) { | |
connection.close(); | |
} | |
} catch (IOException e) { | |
LOGGER.warn(e.getMessage(), e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment