Created
September 18, 2019 19:54
-
-
Save lega911/35b4f6df6b0400ae9503dfaee9ed4f5e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.AMQP; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ConnectionFactory; | |
import java.io.IOException; | |
import java.util.UUID; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.TimeoutException; | |
import java.util.Date; | |
public class RPCClient implements AutoCloseable { | |
private Connection connection; | |
private Channel channel; | |
private String requestQueueName = "rpc_queue"; | |
public RPCClient() throws IOException, TimeoutException { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("localhost"); | |
connection = factory.newConnection(); | |
channel = connection.createChannel(); | |
} | |
public static void main(String[] argv) { | |
try (RPCClient fibonacciRpc = new RPCClient()) { | |
long start = (new Date()).getTime(); | |
long count = 1000; | |
for (int i = 0; i < count; i++) { | |
String i_str = Integer.toString(i); | |
String response = fibonacciRpc.call(i_str); | |
} | |
long end = (new Date()).getTime(); | |
float result = (float)count / (float)(end - start) * 1000; | |
System.out.println("rps: " + result); | |
} catch (IOException | TimeoutException | InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
public String call(String message) throws IOException, InterruptedException { | |
final String corrId = UUID.randomUUID().toString(); | |
String replyQueueName = channel.queueDeclare().getQueue(); | |
AMQP.BasicProperties props = new AMQP.BasicProperties | |
.Builder() | |
.correlationId(corrId) | |
.replyTo(replyQueueName) | |
.build(); | |
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); | |
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); | |
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { | |
if (delivery.getProperties().getCorrelationId().equals(corrId)) { | |
response.offer(new String(delivery.getBody(), "UTF-8")); | |
} | |
}, consumerTag -> { | |
}); | |
String result = response.take(); | |
channel.basicCancel(ctag); | |
return result; | |
} | |
public void close() throws IOException { | |
connection.close(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.*; | |
public class RPCServer { | |
private static final String RPC_QUEUE_NAME = "rpc_queue"; | |
private static int func(int n) { | |
return n; | |
} | |
public static void main(String[] argv) throws Exception { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("localhost"); | |
try (Connection connection = factory.newConnection(); | |
Channel channel = connection.createChannel()) { | |
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); | |
channel.queuePurge(RPC_QUEUE_NAME); | |
channel.basicQos(1); | |
System.out.println(" [x] Awaiting RPC requests"); | |
Object monitor = new Object(); | |
DeliverCallback deliverCallback = (consumerTag, delivery) -> { | |
AMQP.BasicProperties replyProps = new AMQP.BasicProperties | |
.Builder() | |
.correlationId(delivery.getProperties().getCorrelationId()) | |
.build(); | |
String response = ""; | |
try { | |
String message = new String(delivery.getBody(), "UTF-8"); | |
int n = Integer.parseInt(message); | |
response += func(n); | |
} catch (RuntimeException e) { | |
System.out.println(" [.] " + e.toString()); | |
} finally { | |
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); | |
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); | |
// RabbitMq consumer worker thread notifies the RPC server owner thread | |
synchronized (monitor) { | |
monitor.notify(); | |
} | |
} | |
}; | |
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); | |
// Wait and be prepared to consume the message from RPC client. | |
while (true) { | |
synchronized (monitor) { | |
try { | |
monitor.wait(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment