Skip to content

Instantly share code, notes, and snippets.

@smaldini
Created January 2, 2014 22:48
Show Gist options
  • Save smaldini/8228612 to your computer and use it in GitHub Desktop.
Save smaldini/8228612 to your computer and use it in GitHub Desktop.
package reactor.tcp.netty;
import org.junit.Test;
import reactor.core.Environment;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpServer;
import java.util.concurrent.TimeUnit;
import static junit.framework.Assert.assertEquals;
/**
* @author Stephane Maldini
*/
public class ClientServerTests {
final private Environment environment = new Environment();
public void simpleRequestReplyWithNettyTcp() throws Exception {
TcpServer server = null;
TcpClient client = null;
try{
TestServer testServer = new TestServer(environment);
server = testServer.server();
client = new TestClient(environment).client();
testServer.latch().await();
assertEquals("All messages successfully replied from server", testServer.latch().getCount(), 0);
} finally {
if(client != null){
client.close();
}
if(server != null){
server.shutdown().await();
}
}
}
}
package reactor.tcp.netty;
/**
* @author Stephane Maldini
*/
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import reactor.core.Environment;
import reactor.event.dispatch.RingBufferDispatcher;
import reactor.function.Consumer;
import reactor.io.encoding.StandardCodecs;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.spec.TcpClientSpec;
public class TestClient {
private final TcpClient<String, String> client;
public TestClient(Environment env) throws Exception {
TcpClient<String, String> client = new TcpClientSpec<String, String>(NettyTcpClient.class)
.env(env)
.dispatcher(new RingBufferDispatcher("test", 1024, ProducerType.SINGLE, new YieldingWaitStrategy()))
.codec(StandardCodecs.STRING_CODEC)
.connect("localhost", 15151)
.get();
TcpConnection<String, String> connection = client.open().await();
connection.consume(new Consumer<String>() {
@Override
public void accept(String t) {
System.out.println("received: " + t);
}
});
connection.send("data-from-client").onSuccess(new Consumer<Void>() {
@Override
public void accept(Void data) {
System.out.println("data sent");
}
}).onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
t.printStackTrace();
}
});
this.client = client;
}
public TcpClient<String, String> client() {
return client;
}
}
package reactor.tcp.netty;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import reactor.core.Environment;
import reactor.event.dispatch.RingBufferDispatcher;
import reactor.function.Consumer;
import reactor.io.encoding.StandardCodecs;
import reactor.tcp.TcpConnection;
import reactor.tcp.TcpServer;
import reactor.tcp.spec.TcpServerSpec;
import java.util.concurrent.CountDownLatch;
/**
* @author Stephane Maldini
*/
public class TestServer {
private static final int NUMBER_OF_REPLIES = 2000;
private final TcpServer<String, String> server;
private final CountDownLatch latch = new CountDownLatch(NUMBER_OF_REPLIES);
public TestServer(Environment env) throws Exception {
Consumer<TcpConnection<String, String>> serverConsumer = new Consumer<TcpConnection<String, String>>() {
@Override
public void accept(final TcpConnection<String, String> connection) {
connection.consume(new Consumer<String>() {
@Override
public void accept(String data) {
System.out.println("Received data from client -> " + data);
//send data
for (int i = 0; i < NUMBER_OF_REPLIES; i++) {
final String msg = "msg-" + i;
connection.send(msg).onSuccess(new Consumer<Void>() {
@Override
public void accept(Void data) {
System.out.println("sent:" + msg);
latch.countDown();
}
}).onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
t.printStackTrace();
}
});
}
}
});
}
};
// server
TcpServer<String, String> server = new TcpServerSpec<String, String>(NettyTcpServer.class)
.env(env)
.dispatcher(new RingBufferDispatcher("test", 1024, ProducerType.SINGLE, new YieldingWaitStrategy()))
.listen("localhost", 15151)
.codec(StandardCodecs.STRING_CODEC)
.consume(serverConsumer).get();
server.start();
this.server = server;
}
public TcpServer<String, String> server() {
return server;
}
public CountDownLatch latch() {
return latch;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment