Last active
December 13, 2015 21:08
-
-
Save falconair/4975243 to your computer and use it in GitHub Desktop.
Java network performance comparison
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package networkioshootout; | |
import static java.lang.System.out; | |
import java.io.BufferedInputStream; | |
import java.io.DataInputStream; | |
import java.io.DataOutputStream; | |
import java.io.EOFException; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.net.InetAddress; | |
import java.net.InetSocketAddress; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.net.StandardSocketOptions; | |
import java.net.UnknownHostException; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.AsynchronousSocketChannel; | |
import java.nio.channels.CompletionHandler; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.SocketChannel; | |
import java.text.DecimalFormat; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import org.jboss.netty.bootstrap.ClientBootstrap; | |
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.jboss.netty.channel.Channel; | |
import org.jboss.netty.channel.ChannelFactory; | |
import org.jboss.netty.channel.ChannelFuture; | |
import org.jboss.netty.channel.ChannelHandlerContext; | |
import org.jboss.netty.channel.ChannelPipeline; | |
import org.jboss.netty.channel.ChannelPipelineFactory; | |
import org.jboss.netty.channel.Channels; | |
import org.jboss.netty.channel.ExceptionEvent; | |
import org.jboss.netty.channel.MessageEvent; | |
import org.jboss.netty.channel.SimpleChannelHandler; | |
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; | |
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory; | |
import org.jboss.netty.handler.codec.frame.FrameDecoder; | |
/** | |
* | |
* @author Shahbaz Chaudhary | |
* TODO compare async w/ futures and w/ completion handler | |
* Check numbers for sanity | |
*/ | |
public class NetworkPerformanceComparison { | |
private final static int SENDCOUNT = 100_000; | |
private final static int PORT = 9000; | |
private final static int TESTLOOP = 30; | |
private final static DecimalFormat df = new DecimalFormat("#.####"); | |
private final static Random rn = new Random(); | |
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { | |
//Server | |
new Server().start(); | |
//Client | |
//warmup | |
clientNettyOIO(1024,true); | |
clientNettyNIO(1024,true); | |
clientNoNetwork(1024, true); | |
clientInputstream(1024, true); | |
clientBufferedInputstream(1024, true); | |
clientDataBufferedInputstream(1024, true); | |
clientAsyncChannel(1024, true); | |
clientNonBlockedSelectorChannel(1024, true); | |
clientNonBlockedSpinChannel(1024, true); | |
clientBlockedChannel(1024, true); | |
attemptGC(); | |
//System.out.println("#Warmup done"); | |
//test | |
out.println("ID,MIN_LATENCY,MAX_LATENCY,AVG_LATENCY,MICROSECS_PER_BYTE,MSGS_PER_MILLISEC,TCP_NO_DELAY,BUFFER_SIZE,ITERATION"); | |
for(int i=0; i< TESTLOOP; i++){ | |
final int DATASIZE = (1+rn.nextInt(99))*16; | |
final boolean TCP_NO_DELAY = rn.nextBoolean(); | |
attemptGC(); | |
printResult(clientNettyOIO(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientNettyNIO(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientNoNetwork(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientNonBlockedSelectorChannel(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientInputstream(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientBufferedInputstream(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientDataBufferedInputstream(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientDataInputstream(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientAsyncChannel(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientNonBlockedSpinChannel(DATASIZE, TCP_NO_DELAY),i); | |
attemptGC(); | |
printResult(clientBlockedChannel(DATASIZE, TCP_NO_DELAY),i); | |
} | |
} | |
/** | |
* Just transfer equivalent amount of data over ArrayBlockingQueue, I suppose as a 'control' set | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
* @throws InterruptedException | |
*/ | |
private static Map<String,String> clientNoNetwork(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException, InterruptedException { | |
Parser p = new Parser("No Network"); | |
final ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<>(bufferSize); | |
//Push data into queue | |
Thread th = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
for(int i=0; i< SENDCOUNT; i++){ | |
try { | |
bq.put(ByteBuffer.allocate(16).putLong(System.nanoTime()).putLong(i)); | |
} catch (InterruptedException e) { e.printStackTrace(); } | |
} | |
} | |
}); | |
th.start(); | |
p.startTimer(); | |
for(int i=0; i < SENDCOUNT; i++){ | |
ByteBuffer bb = bq.take(); | |
bb.flip(); | |
p.process(16, bb); | |
} | |
p.endTimer(); | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
th.join();//just to make sure we didn't accidentally leave the thread running | |
return res; | |
} | |
/** | |
* Connect to server using InputStream. Simplest possible way to connect to server | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientInputstream(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
Parser p = new Parser("OIO InputStream"); | |
Socket client = new Socket(InetAddress.getLocalHost(), PORT); | |
client.setTcpNoDelay(TCP_NO_DELAY); | |
InputStream in = client.getInputStream(); | |
try{ | |
p.startTimer(); | |
p.process(in); | |
p.endTimer(); | |
} | |
finally{ | |
client.close(); | |
} | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(0)); | |
return res; | |
} | |
/** | |
* Connect to server using InputStream, but wrap BufferedInputStream around it. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientBufferedInputstream(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
Parser p = new Parser("OIO BufferedInputStream"); | |
Socket client = new Socket(InetAddress.getLocalHost(), PORT); | |
client.setTcpNoDelay(TCP_NO_DELAY); | |
InputStream in = new BufferedInputStream(client.getInputStream(), bufferSize); | |
try{ | |
p.startTimer(); | |
p.process(in); | |
p.endTimer(); | |
} | |
finally{ | |
client.close(); | |
} | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
return res; | |
} | |
/** | |
* Connect to server using DataInputStream, since server will be sending us two 'long' values. Most obvious solution. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientDataInputstream(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
Parser p = new Parser("OIO DataInputStream"); | |
Socket client = new Socket(InetAddress.getLocalHost(), PORT); | |
client.setTcpNoDelay(TCP_NO_DELAY); | |
DataInputStream in = new DataInputStream(client.getInputStream()); | |
try{ | |
p.startTimer(); | |
p.process(in); | |
p.endTimer(); | |
} | |
finally{ | |
client.close(); | |
} | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(0)); | |
return res; | |
} | |
/** | |
* Connect to server using DataInputStream with a buffer. Most obvious solution for someone who thinks about throughput. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientDataBufferedInputstream(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
Parser p = new Parser("OIO DataBufferedInputStream"); | |
Socket client = new Socket(InetAddress.getLocalHost(), PORT); | |
client.setTcpNoDelay(TCP_NO_DELAY); | |
DataInputStream in = new DataInputStream(new BufferedInputStream(client.getInputStream(),bufferSize)); | |
try{ | |
p.startTimer(); | |
p.process(in); | |
p.endTimer(); | |
} | |
finally{ | |
client.close(); | |
} | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
return res; | |
} | |
/** | |
* Connect to server using NIO channels. However, someone using NIO isn't likely to fall back to blocking sockets. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientBlockedChannel(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
Parser p = new Parser("NIO Blocked Channel"); | |
SocketChannel channel = SocketChannel.open(); | |
channel.setOption(StandardSocketOptions.TCP_NODELAY,TCP_NO_DELAY); | |
channel.connect(new InetSocketAddress(InetAddress.getLocalHost(), PORT)); | |
ByteBuffer data = ByteBuffer.allocate(bufferSize); | |
int size = 0; | |
try{ | |
p.startTimer(); | |
while(-1 != (size = channel.read(data))){ | |
data.flip(); | |
p.process(size, data); | |
data.clear(); | |
} | |
p.endTimer(); | |
//System.out.println(p); | |
} | |
finally{ | |
channel.close(); | |
} | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
return res; | |
} | |
/** | |
* Connect to server using NIO sockets, but instead of blocking, simply spin in a loop. Polling on steroids, which means | |
* it probably isn't a good idea, unless you REALLY want to win. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientNonBlockedSpinChannel(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
Parser p = new Parser("NIO Non-Blocked Spinning Channel"); | |
SocketChannel channel = SocketChannel.open(); | |
channel.setOption(StandardSocketOptions.TCP_NODELAY,TCP_NO_DELAY); | |
channel.configureBlocking(false); | |
channel.connect(new InetSocketAddress(InetAddress.getLocalHost(), PORT)); | |
channel.finishConnect(); | |
ByteBuffer data = ByteBuffer.allocate(bufferSize); | |
int size = 0; | |
try{ | |
p.startTimer(); | |
while(-1 != (size = channel.read(data))){ | |
if(size != 0){ | |
data.flip(); | |
p.process(size, data); | |
data.clear(); | |
} | |
} | |
p.endTimer(); | |
//System.out.println(p); | |
} | |
finally{ | |
channel.close(); | |
} | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
return res; | |
} | |
/** | |
* Connect to server using NIO selectors. Most people wouldn't do this for client connections. Selectors are supposed to | |
* be used by highly scalable servers. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientNonBlockedSelectorChannel(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
final Parser p = new Parser("NIO Non-Blocked Selector Channel"); | |
Selector selector = Selector.open(); | |
SocketChannel channel = SocketChannel.open(); | |
channel.setOption(StandardSocketOptions.TCP_NODELAY,TCP_NO_DELAY); | |
channel.configureBlocking(false); | |
channel.register(selector, SelectionKey.OP_READ); | |
channel.connect(new InetSocketAddress(InetAddress.getLocalHost(), PORT)); | |
ByteBuffer data = ByteBuffer.allocate(bufferSize); | |
int size = 0; | |
channel.finishConnect(); | |
p.startTimer(); | |
try{ | |
while(true){ | |
selector.select(); | |
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); | |
while(iter.hasNext()){ | |
SelectionKey key = iter.next(); | |
iter.remove(); | |
//remove selectionKey, since we are going to deal with it now | |
if(key.isReadable()){ | |
size = ((SocketChannel)key.channel()).read(data); | |
if(size != -1){ | |
data.flip(); | |
p.process(size, data); | |
data.clear(); | |
} | |
else{ | |
p.endTimer(); | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
return res; | |
} | |
} | |
} | |
} | |
} | |
catch(Exception e){ | |
e.printStackTrace(); | |
} | |
finally{ | |
channel.close(); | |
} | |
//never called | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
return res; | |
} | |
/** | |
* NIO2 way of connecting to servers. Poor documentation and few blogs showing how to do it right. Combines the complexity | |
* of ByteBuffers, async programming and recursive calls. CRUD/JDBC developers should think twice before using this | |
* instead of basic stream. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
* @throws InterruptedException | |
* @throws ExecutionException | |
*/ | |
private static Map<String,String> clientAsyncChannel(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException, InterruptedException, ExecutionException { | |
final Parser p = new Parser("NIO2 Async"); | |
final CountDownLatch latch = new CountDownLatch(1); | |
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); | |
channel.setOption(StandardSocketOptions.TCP_NODELAY,TCP_NO_DELAY); | |
Future<Void> connectFuture = channel.connect(new InetSocketAddress(InetAddress.getLocalHost(), PORT)); | |
connectFuture.get(); | |
final ByteBuffer data = ByteBuffer.allocate(bufferSize); | |
try{ | |
p.startTimer(); | |
channel.read(data, null, new CompletionHandler<Integer, Void>() { | |
@Override | |
public void completed(Integer result, Void att) { | |
final int size = result.intValue(); | |
if(size != -1){ | |
data.flip(); | |
p.process(size, data); | |
data.clear(); | |
channel.read(data,null,this); | |
} | |
else{ | |
p.endTimer(); | |
latch.countDown(); | |
} | |
} | |
@Override | |
public void failed(Throwable exc, Void att) { | |
exc.printStackTrace(); | |
} | |
}); | |
latch.await(); | |
} | |
finally{ | |
channel.close(); | |
} | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(bufferSize)); | |
return res; | |
} | |
/** | |
* Connect to server using Netty's NIO client. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientNettyNIO(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
final Parser p = new Parser("Netty NIO"); | |
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); | |
ClientBootstrap bootstrap = new ClientBootstrap(factory); | |
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { | |
public ChannelPipeline getPipeline() { | |
return Channels.pipeline( | |
//Pass through frame decoder | |
new FrameDecoder() { | |
@Override | |
protected Object decode(ChannelHandlerContext ctx, Channel channel,ChannelBuffer buf) throws Exception { | |
int readableBytes = buf.readableBytes(); | |
if(readableBytes < 16) return null; | |
int records = readableBytes/16; | |
return buf.readBytes(records*16); | |
} | |
}, | |
//Pass received data to processor | |
new SimpleChannelHandler(){ | |
@Override | |
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { | |
ChannelBuffer buf = (ChannelBuffer) e.getMessage(); | |
p.process(buf); | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { | |
e.getCause().printStackTrace(); | |
e.getChannel().close(); | |
p.endTimer(); | |
} | |
}); | |
} | |
}); | |
bootstrap.setOption("tcpNoDelay", TCP_NO_DELAY); | |
p.startTimer(); | |
ChannelFuture future = bootstrap.connect(new InetSocketAddress(InetAddress.getLocalHost(), PORT)); | |
future.awaitUninterruptibly(); | |
if (!future.isSuccess()) { | |
future.getCause().printStackTrace(); | |
} | |
future.getChannel().getCloseFuture().awaitUninterruptibly(); | |
factory.releaseExternalResources(); | |
p.endTimer(); | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(0)); | |
return res; | |
} | |
/** | |
* Connect to server using Netty's non-NIO client. | |
* @param bufferSize | |
* @param TCP_NO_DELAY | |
* @return | |
* @throws IOException | |
* @throws UnknownHostException | |
*/ | |
private static Map<String,String> clientNettyOIO(int bufferSize, boolean TCP_NO_DELAY) throws IOException, UnknownHostException { | |
final Parser p = new Parser("Netty OIO"); | |
ChannelFactory factory = new OioClientSocketChannelFactory(Executors.newCachedThreadPool()); | |
ClientBootstrap bootstrap = new ClientBootstrap(factory); | |
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { | |
public ChannelPipeline getPipeline() { | |
return Channels.pipeline( | |
//Pass through frame decoder | |
new FrameDecoder() { | |
@Override | |
protected Object decode(ChannelHandlerContext ctx, Channel channel,ChannelBuffer buf) throws Exception { | |
int readableBytes = buf.readableBytes(); | |
if(readableBytes < 16) return null; | |
int records = readableBytes/16; | |
return buf.readBytes(records*16); | |
} | |
}, | |
//Pass received data to processor | |
new SimpleChannelHandler(){ | |
@Override | |
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { | |
ChannelBuffer buf = (ChannelBuffer) e.getMessage(); | |
p.process(buf); | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { | |
e.getCause().printStackTrace(); | |
e.getChannel().close(); | |
p.endTimer(); | |
} | |
}); | |
} | |
}); | |
bootstrap.setOption("tcpNoDelay", TCP_NO_DELAY); | |
p.startTimer(); | |
ChannelFuture future = bootstrap.connect(new InetSocketAddress(InetAddress.getLocalHost(), PORT)); | |
future.awaitUninterruptibly(); | |
if (!future.isSuccess()) { | |
future.getCause().printStackTrace(); | |
} | |
future.getChannel().getCloseFuture().awaitUninterruptibly(); | |
factory.releaseExternalResources(); | |
p.endTimer(); | |
Map<String,String> res = p.getResults(); | |
res.put("TCP_NO_DELAY", Boolean.toString(TCP_NO_DELAY)); | |
res.put("BUFFER_SIZE", df.format(0)); | |
return res; | |
} | |
/** | |
* | |
* Very simple server which handles only one client at a time and simply sends them two 'long' values in a tight loop. | |
* | |
*/ | |
static final class Server extends Thread{ | |
public void run(){ | |
try { | |
final ServerSocket server = new ServerSocket(PORT); | |
while(true){ | |
final Socket c1 = server.accept(); | |
c1.setTcpNoDelay(true); | |
long counter = 0; | |
DataOutputStream serverout; | |
try { | |
serverout = new DataOutputStream(c1.getOutputStream()); | |
for(int i=0;i<SENDCOUNT;i++){ | |
serverout.writeLong(System.nanoTime()); | |
serverout.writeLong(counter); | |
counter++; | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
finally{ | |
try { c1.close(); } catch (IOException e) { e.printStackTrace();} | |
} | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
/** | |
* Logic to extract data out of ByteBuffers or streams and process them. | |
* | |
*/ | |
static final class Parser{ | |
//Not thread safe | |
private final String id; | |
//One of the following is used, depending on the implementation | |
private final byte[] internalBufferBA = new byte[8]; | |
private final ByteBuffer internalBufferBB = ByteBuffer.allocate(8); | |
private int size = 0; | |
private long counter = 0; | |
private int totalBytes = 0; | |
private long minLatency = Long.MAX_VALUE; | |
private long maxLatency = Long.MIN_VALUE; | |
private long sumLatency = 0; | |
private long startTime = 0; | |
private long endTime = 0; | |
public Parser(String id){ | |
this.id = id; | |
} | |
public long startTimer(){ return startTime = System.nanoTime(); } | |
public long endTimer(){ return endTime = System.nanoTime(); } | |
public long duration(){ return endTime - startTime; } | |
public void process(InputStream input) throws IOException{ | |
while(true){ | |
size = input.read(internalBufferBA, 0, 8); | |
if(size == -1) break; | |
long remoteTS = toLong(internalBufferBA); | |
size = input.read(internalBufferBA, 0, 8); | |
if(size == -1) break; | |
long remoteCounter = toLong(internalBufferBA); | |
if(remoteCounter != counter){ | |
String error = "Expected remote counter to be "+counter+" but it was actually "+remoteCounter; | |
out.println(error); | |
throw new RuntimeException(error); | |
} | |
long localTS = System.nanoTime(); | |
long latency = localTS - remoteTS; | |
if(latency > maxLatency) maxLatency = latency; | |
if(latency < minLatency) minLatency = latency; | |
sumLatency += latency; | |
totalBytes+=16; | |
counter++; | |
} | |
} | |
public void process(DataInputStream input) throws IOException{ | |
try{ | |
while(true){ | |
long remoteTS = input.readLong(); | |
long remoteCounter = input.readLong(); | |
if(remoteCounter != counter){ | |
String error = "Expected remote counter to be "+counter+" but it was actually "+remoteCounter; | |
System.out.println(error); | |
throw new RuntimeException(error); | |
} | |
long localTS = System.nanoTime(); | |
long latency = localTS - remoteTS; | |
if(latency > maxLatency) maxLatency = latency; | |
if(latency < minLatency) minLatency = latency; | |
sumLatency += latency; | |
totalBytes+=16; | |
counter++; | |
} | |
} | |
catch(EOFException e){ | |
//EOF reached, how else can this be detected? | |
} | |
} | |
public void process(int size, ByteBuffer buf){ | |
//if internal buffer is empty and buf is empty, end loop | |
//if internal buffer is empty and buf contains only 1 val, move buf to internal buffer, end loop | |
//if internal buffer is empty and buf contains atleast 2 'long' vals, read them and loop | |
//if internal buffer is full and buf contains atleast 1 'long val, read both and loop | |
while(buf.remaining() > 0){ | |
long remoteTS = 0; | |
long remoteCounter = 0; | |
if(internalBufferBB.position() == 0 && buf.remaining() == 0){ | |
break; | |
} | |
if(internalBufferBB.position() == 0 && buf.remaining() == 8){ | |
//buf.flip(); | |
long longVal= buf.getLong(); | |
internalBufferBB.putLong(longVal); | |
break; | |
} | |
if(internalBufferBB.position() == 0 && buf.remaining() >= 16){ | |
//buf.flip(); | |
remoteTS = buf.getLong(); | |
remoteCounter = buf.getLong(); | |
} | |
if(internalBufferBB.position() == 8 && buf.remaining() >= 8){ | |
internalBufferBB.flip(); | |
remoteTS = internalBufferBB.getLong(); | |
internalBufferBB.clear(); | |
//buf.flip(); | |
remoteCounter = buf.getLong(); | |
} | |
if(remoteCounter != counter){ | |
String error = "Expected remote counter to be "+counter+" but it was actually "+remoteCounter; | |
System.out.println(error); | |
throw new RuntimeException(error); | |
} | |
long localTS = System.nanoTime(); | |
long latency = localTS - remoteTS; | |
//out.println(String.format("latency:%s, remoteTS:%s, localTS:%s", latency, remoteTS, localTS)); | |
if(latency > maxLatency) maxLatency = latency; | |
if(latency < minLatency) minLatency = latency; | |
sumLatency += latency; | |
totalBytes+=16; | |
counter++; | |
} | |
} | |
public void process(ChannelBuffer buf) { | |
while(buf.readableBytes() >= 16){ | |
long remoteTS = 0; | |
long remoteCounter = 0; | |
remoteTS = buf.readLong(); | |
remoteCounter = buf.readLong(); | |
if(remoteCounter != counter){ | |
String error = "Expected remote counter to be "+counter+" but it was actually "+remoteCounter+", while TS was "+remoteTS; | |
System.out.println(error); | |
throw new RuntimeException(error); | |
} | |
long localTS = System.nanoTime(); | |
long latency = localTS - remoteTS; | |
if(latency > maxLatency) maxLatency = latency; | |
if(latency < minLatency) minLatency = latency; | |
sumLatency += latency; | |
totalBytes+=16; | |
counter++; | |
} | |
} | |
public Map<String,String> getResults(){ | |
final Map<String,String> res = new HashMap<String, String>(); | |
res.put("ID", id); | |
res.put("MIN_LATENCY", df.format(minLatency/1000d)); | |
res.put("MAX_LATENCY", df.format(maxLatency /1000d)); | |
res.put("AVG_LATENCY", df.format((counter==0? 0 :(sumLatency/counter))/1000d)); | |
res.put("MICROSECS_PER_BYTE", df.format(totalBytes==0?0:((((double)duration())/1000d)/((double)totalBytes)))); | |
res.put("MSGS_PER_MILLISEC", df.format(duration()==0? 0 : ((double)counter)/(((double)duration())/1000d/1000d))); | |
return res; | |
} | |
} | |
/* | |
* Convert byte array to long | |
* http://www.daniweb.com/software-development/java/code/216874/primitive-types-as-byte-arrays | |
*/ | |
public static long toLong(byte[] data) { | |
//if (data == null || data.length != 8) return 0x0; | |
// ---------- | |
return (long)( | |
// (Below) convert to longs before shift because digits | |
// are lost with ints beyond the 32-bit limit | |
(long)(0xff & data[0]) << 56 | | |
(long)(0xff & data[1]) << 48 | | |
(long)(0xff & data[2]) << 40 | | |
(long)(0xff & data[3]) << 32 | | |
(long)(0xff & data[4]) << 24 | | |
(long)(0xff & data[5]) << 16 | | |
(long)(0xff & data[6]) << 8 | | |
(long)(0xff & data[7]) << 0 | |
); | |
} | |
private static void printResult(Map<String, String> result, int iteration) { | |
StringBuilder sb = new StringBuilder(); | |
sb.append(result.get("ID")).append(","); | |
sb.append(result.get("MIN_LATENCY")).append(","); | |
sb.append(result.get("MAX_LATENCY")).append(","); | |
sb.append(result.get("AVG_LATENCY")).append(","); | |
sb.append(result.get("MICROSECS_PER_BYTE")).append(","); | |
sb.append(result.get("MSGS_PER_MILLISEC")).append(","); | |
sb.append(result.get("TCP_NO_DELAY")).append(","); | |
sb.append(result.get("BUFFER_SIZE")).append(","); | |
sb.append(df.format(iteration)); | |
out.println(sb); | |
} | |
private static void attemptGC(){ | |
//this is just a hint, I know. | |
for(int i=0; i< 100; i++)System.gc(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A blog post about this post at: http://falconair.github.com/posts/2013/02/25/java-apis-for-socket-programming/java-apis-for-socket-programming.html