Created
January 2, 2014 22:48
-
-
Save smaldini/8228612 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package reactor.tcp.netty; | |
import org.junit.Test; | |
import reactor.core.Environment; | |
import reactor.tcp.TcpClient; | |
import reactor.tcp.TcpServer; | |
import java.util.concurrent.TimeUnit; | |
import static junit.framework.Assert.assertEquals; | |
/** | |
* @author Stephane Maldini | |
*/ | |
public class ClientServerTests { | |
final private Environment environment = new Environment(); | |
public void simpleRequestReplyWithNettyTcp() throws Exception { | |
TcpServer server = null; | |
TcpClient client = null; | |
try{ | |
TestServer testServer = new TestServer(environment); | |
server = testServer.server(); | |
client = new TestClient(environment).client(); | |
testServer.latch().await(); | |
assertEquals("All messages successfully replied from server", testServer.latch().getCount(), 0); | |
} finally { | |
if(client != null){ | |
client.close(); | |
} | |
if(server != null){ | |
server.shutdown().await(); | |
} | |
} | |
} | |
} | |
package reactor.tcp.netty; | |
/** | |
* @author Stephane Maldini | |
*/ | |
import com.lmax.disruptor.YieldingWaitStrategy; | |
import com.lmax.disruptor.dsl.ProducerType; | |
import reactor.core.Environment; | |
import reactor.event.dispatch.RingBufferDispatcher; | |
import reactor.function.Consumer; | |
import reactor.io.encoding.StandardCodecs; | |
import reactor.tcp.TcpClient; | |
import reactor.tcp.TcpConnection; | |
import reactor.tcp.spec.TcpClientSpec; | |
public class TestClient { | |
private final TcpClient<String, String> client; | |
public TestClient(Environment env) throws Exception { | |
TcpClient<String, String> client = new TcpClientSpec<String, String>(NettyTcpClient.class) | |
.env(env) | |
.dispatcher(new RingBufferDispatcher("test", 1024, ProducerType.SINGLE, new YieldingWaitStrategy())) | |
.codec(StandardCodecs.STRING_CODEC) | |
.connect("localhost", 15151) | |
.get(); | |
TcpConnection<String, String> connection = client.open().await(); | |
connection.consume(new Consumer<String>() { | |
@Override | |
public void accept(String t) { | |
System.out.println("received: " + t); | |
} | |
}); | |
connection.send("data-from-client").onSuccess(new Consumer<Void>() { | |
@Override | |
public void accept(Void data) { | |
System.out.println("data sent"); | |
} | |
}).onError(new Consumer<Throwable>() { | |
@Override | |
public void accept(Throwable t) { | |
t.printStackTrace(); | |
} | |
}); | |
this.client = client; | |
} | |
public TcpClient<String, String> client() { | |
return client; | |
} | |
} | |
package reactor.tcp.netty; | |
import com.lmax.disruptor.YieldingWaitStrategy; | |
import com.lmax.disruptor.dsl.ProducerType; | |
import reactor.core.Environment; | |
import reactor.event.dispatch.RingBufferDispatcher; | |
import reactor.function.Consumer; | |
import reactor.io.encoding.StandardCodecs; | |
import reactor.tcp.TcpConnection; | |
import reactor.tcp.TcpServer; | |
import reactor.tcp.spec.TcpServerSpec; | |
import java.util.concurrent.CountDownLatch; | |
/** | |
* @author Stephane Maldini | |
*/ | |
public class TestServer { | |
private static final int NUMBER_OF_REPLIES = 2000; | |
private final TcpServer<String, String> server; | |
private final CountDownLatch latch = new CountDownLatch(NUMBER_OF_REPLIES); | |
public TestServer(Environment env) throws Exception { | |
Consumer<TcpConnection<String, String>> serverConsumer = new Consumer<TcpConnection<String, String>>() { | |
@Override | |
public void accept(final TcpConnection<String, String> connection) { | |
connection.consume(new Consumer<String>() { | |
@Override | |
public void accept(String data) { | |
System.out.println("Received data from client -> " + data); | |
//send data | |
for (int i = 0; i < NUMBER_OF_REPLIES; i++) { | |
final String msg = "msg-" + i; | |
connection.send(msg).onSuccess(new Consumer<Void>() { | |
@Override | |
public void accept(Void data) { | |
System.out.println("sent:" + msg); | |
latch.countDown(); | |
} | |
}).onError(new Consumer<Throwable>() { | |
@Override | |
public void accept(Throwable t) { | |
t.printStackTrace(); | |
} | |
}); | |
} | |
} | |
}); | |
} | |
}; | |
// server | |
TcpServer<String, String> server = new TcpServerSpec<String, String>(NettyTcpServer.class) | |
.env(env) | |
.dispatcher(new RingBufferDispatcher("test", 1024, ProducerType.SINGLE, new YieldingWaitStrategy())) | |
.listen("localhost", 15151) | |
.codec(StandardCodecs.STRING_CODEC) | |
.consume(serverConsumer).get(); | |
server.start(); | |
this.server = server; | |
} | |
public TcpServer<String, String> server() { | |
return server; | |
} | |
public CountDownLatch latch() { | |
return latch; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment