Skip to content

Instantly share code, notes, and snippets.

@unixc3t
Created October 5, 2017 07:45
Show Gist options
  • Save unixc3t/85dd090b0948818c0a14e6d834959246 to your computer and use it in GitHub Desktop.
Save unixc3t/85dd090b0948818c0a14e6d834959246 to your computer and use it in GitHub Desktop.
Publish-subscribe communication
package ptos;
import ptop.Sender;
/**
* rudy
* 3:15 PM
* 10/5/17.
*/
public class FanoutExchangeSenderDemo {
private static final String FANOUT_EXCHANGE_TYPE = "fanout";
public static void sendToFanoutExchange(String exchange) {
Sender sender = new Sender();
sender.initialize();
sender.send(exchange, FANOUT_EXCHANGE_TYPE, "Test fanout message.");
sender.destroy();
}
public static void main(String[] args) {
sendToFanoutExchange("pubsub_exchange");
}
}
package ptos;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* rudy
* 2:34 PM
* 10/5/17.
*/
public class PublishSubscribeReceiver {
private final static String EXCHANGE_NAME = "pubsub_exchange";
private final static Logger LOGGER = LoggerFactory.getLogger(PublishSubscribeReceiver.class);
private Channel channel = null;
private Connection connection = null;
String message = null;
public void initialize() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public String receive(String queue) {
if (channel == null) {
initialize();
}
try {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, EXCHANGE_NAME, "");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
LOGGER.debug(message);
}
};
channel.basicConsume(queue, true, consumer);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
return message;
}
public void destroy() {
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
LOGGER.warn(e.getMessage(), e);
}
}
}
package ptos;
/**
* rudy
* 3:18 PM
* 10/5/17.
*/
public class PublishSubscribeReceiverDemo {
public static void main(String[] args) throws InterruptedException {
final PublishSubscribeReceiver receiver1 = new PublishSubscribeReceiver();
receiver1.initialize();
final PublishSubscribeReceiver receiver2 = new PublishSubscribeReceiver();
receiver2.initialize();
Thread t1 = new Thread(new Runnable() {
public void run() {
receiver1.receive("pubsub_queue1");
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
receiver2.receive("pubsub_queue2");
}
});
t1.start();
t2.start();
t1.join();
t2.join();
receiver1.destroy();
receiver2.destroy();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment