-
-
Save pcdinh/1575118 to your computer and use it in GitHub Desktop.
Pure Java NIO scalability test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.net.ServerSocket; | |
import java.net.StandardSocketOptions; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.CancelledKeyException; | |
import java.nio.channels.ReadableByteChannel; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.nio.channels.WritableByteChannel; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.concurrent.BlockingDeque; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.LinkedBlockingDeque; | |
/** | |
* @author Jon Brisbin <[email protected]> | |
*/ | |
public class NioScalabilityTest { | |
@SuppressWarnings({"unchecked"}) | |
public static void main(String[] args) throws InterruptedException, IOException { | |
final int cores = Runtime.getRuntime().availableProcessors(); | |
final int bufferSize = 4 * 1024; | |
final int ringSize = 512; | |
final Random random = new Random(); | |
final Map<SocketChannel, TaskRunner> socketToRunnerMap = new HashMap<>(); | |
final BlockingDeque<SelectionEvent> readyEvents = new LinkedBlockingDeque<>(ringSize); | |
for (int i = 0; i < ringSize; i++) { | |
readyEvents.add(new SelectionEvent()); | |
} | |
TaskRunner[] runners = new TaskRunner[cores]; | |
for (int i = 0; i < cores; i++) { | |
runners[i] = new TaskRunner(ringSize, readyEvents, socketToRunnerMap); | |
} | |
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); | |
serverSocketChannel.configureBlocking(false); | |
Selector selector = Selector.open(); | |
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 3000), 1024); | |
System.out.println("Listening on localhost:3000..."); | |
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); | |
while (true) { | |
int cnt = 0; | |
try { | |
cnt = selector.select(); | |
} catch (CancelledKeyException e) {} | |
if (cnt > 0) { | |
Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); | |
while (keys.hasNext()) { | |
SelectionKey key = keys.next(); | |
keys.remove(); | |
if (key.isValid()) { | |
if (key.isAcceptable()) { | |
ServerSocket serverSocket = serverSocketChannel.socket(); | |
serverSocket.setReceiveBufferSize(bufferSize); | |
serverSocket.setReuseAddress(true); | |
boolean hasSocket = true; | |
do { | |
SocketChannel channel = serverSocketChannel.accept(); | |
if (null != channel) { | |
channel.configureBlocking(false); | |
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); | |
channel.setOption(StandardSocketOptions.TCP_NODELAY, true); | |
channel.setOption(StandardSocketOptions.SO_RCVBUF, bufferSize); | |
channel.setOption(StandardSocketOptions.SO_SNDBUF, bufferSize); | |
SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ); | |
int runnerId = random.nextInt(cores); | |
TaskRunner runner = runners[runnerId]; | |
socketToRunnerMap.put(channel, runner); | |
final SelectionEvent event = readyEvents.take(); | |
event.channel = channel; | |
event.key = readKey; | |
key.attach(runnerId); | |
runner.events.add(event); | |
} else { | |
hasSocket = false; | |
} | |
} while (hasSocket); | |
} else if (key.isReadable()) { | |
SocketChannel channel = (SocketChannel) key.channel(); | |
TaskRunner runner = socketToRunnerMap.get(channel); | |
if (null != runner) { | |
final SelectionEvent event = readyEvents.take(); | |
event.channel = channel; | |
event.key = key; | |
runner.events.add(event); | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
static int safeRead(ReadableByteChannel channel, ByteBuffer dst) throws IOException { | |
int read = -1; | |
try { | |
// Read data from the Channel | |
read = channel.read(dst); | |
} catch (IOException e) { | |
switch ("" + e.getMessage()) { | |
case "null": | |
case "Connection reset by peer": | |
case "Broken pipe": | |
break; | |
default: | |
e.printStackTrace(); | |
} | |
channel.close(); | |
} catch (CancelledKeyException e) { | |
channel.close(); | |
} | |
return read; | |
} | |
static int safeWrite(WritableByteChannel channel, ByteBuffer src) throws IOException { | |
int written = -1; | |
try { | |
// Write the response immediately | |
written = channel.write(src); | |
} catch (IOException e) { | |
switch ("" + e.getMessage()) { | |
case "null": | |
case "Connection reset by peer": | |
case "Broken pipe": | |
break; | |
default: | |
e.printStackTrace(); | |
} | |
channel.close(); | |
} catch (CancelledKeyException e) { | |
channel.close(); | |
} | |
return written; | |
} | |
static class SelectionEvent { | |
SocketChannel channel; | |
SelectionKey key; | |
ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); | |
SelectionEvent() { | |
} | |
} | |
static class TaskRunner implements Runnable { | |
BlockingDeque<SelectionEvent> events; | |
BlockingDeque<SelectionEvent> readyEvents; | |
Map<SocketChannel, TaskRunner> socketToRunnerMap; | |
Executor executor = Executors.newSingleThreadExecutor(); | |
ByteBuffer msg = ByteBuffer.wrap( | |
("HTTP/1.1 200 OK\r\n" + | |
"Connection: Keep-Alive\r\n" + | |
"Content-Type: text/plain\r\n" + | |
"Content-Length: 12\r\n\r\n" + | |
"Hello World!").getBytes() | |
); | |
TaskRunner(int size, BlockingDeque<SelectionEvent> readyEvents, Map<SocketChannel, TaskRunner> socketToRunnerMap) { | |
this.events = new LinkedBlockingDeque<>(size); | |
this.readyEvents = readyEvents; | |
this.socketToRunnerMap = socketToRunnerMap; | |
executor.execute(this); | |
} | |
@Override public void run() { | |
SelectionEvent ev; | |
try { | |
while (null != (ev = events.take())) { | |
if (ev.buffer.position() > 0) { | |
ev.buffer.clear(); | |
} | |
try { | |
int read = safeRead(ev.channel, ev.buffer); | |
while (read > 0) { | |
safeWrite(ev.channel, msg.duplicate()); | |
// Read the data into memory | |
ev.buffer.flip(); | |
byte[] bytes = new byte[ev.buffer.remaining()]; | |
ev.buffer.get(bytes); | |
//String input = new String(bytes); | |
ev.buffer.clear(); | |
read = safeRead(ev.channel, ev.buffer); | |
} | |
if (read < 0) { | |
socketToRunnerMap.remove(ev.channel); | |
ev.key.cancel(); | |
} | |
} catch (IOException e) { | |
throw new IllegalStateException(e); | |
} finally { | |
readyEvents.push(ev); | |
} | |
} | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment