Created
April 7, 2012 00:41
-
-
Save shouichi/2324311 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.channels.SocketChannel; | |
public class ChangeRequest { | |
public static final int REGISTER = 1; | |
public static final int CHANGEOPS = 2; | |
public SocketChannel socket; | |
public int type; | |
public int ops; | |
public ChangeRequest(SocketChannel socket, int type, int ops) { | |
this.socket = socket; | |
this.type = type; | |
this.ops = ops; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.channels.SocketChannel; | |
import java.util.LinkedList; | |
import java.util.List; | |
public class EchoWorker implements Runnable { | |
private List<ServerDataEvent> queue = new LinkedList<ServerDataEvent>(); | |
public void processData(NioServer server, SocketChannel socket, byte[] data, int count) { | |
byte[] dataCopy = new byte[count]; | |
System.arraycopy(data, 0, dataCopy, 0, count); | |
synchronized(queue) { | |
queue.add(new ServerDataEvent(server, socket, dataCopy)); | |
queue.notify(); | |
} | |
} | |
public void run() { | |
ServerDataEvent dataEvent; | |
while(true) { | |
// Wait for data to become available | |
synchronized(queue) { | |
while(queue.isEmpty()) { | |
try { | |
queue.wait(); | |
} catch (InterruptedException e) { | |
} | |
} | |
dataEvent = (ServerDataEvent) queue.remove(0); | |
} | |
// Return to sender | |
dataEvent.server.send(dataEvent.socket, dataEvent.data); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
nio-server = ${config_path} | |
nio-server.classpath = ${config_path} | |
nio-server.sourcepath = ${config_path} | |
jpf-net-iocache.boot.peer.command=${config_path}/peer.sh | |
report.console.property_violation=result,error,trace,snapshot |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RM=rm -f | |
JAVAC=javac | |
JAVAC_FLAGS=-g #-Xlint:unchecked | |
JAVA=java | |
JAVA_FLAGS=-ea | |
JPF=~/jpf/jpf-core/bin/jpf | |
SOURCES=ChangeRequest.java EchoWorker.java NioClient.java\ | |
NioServer.java RspHandler.java ServerDataEvent.java | |
CLASSES=$(SOURCES:.java=.class) | |
all: compile | |
compile: $(CLASSES) | |
server: compile | |
$(JAVA) $(JAVA_FLAGS) NioServer | |
client: compile | |
$(JAVA) $(JAVA_FLAGS) NioClient | |
jpf-server: compile | |
$(JPF) NioServer | |
jpf-client: compile | |
$(JPF) NioClient | |
.SUFFIXES: | |
.SUFFIXES: .java .class | |
.java.class: | |
$(JAVAC) $(JAVAC_FLAGS) $< | |
.PHONY: clean | |
clean: | |
$(RM) *.class |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.net.InetAddress; | |
import java.net.InetSocketAddress; | |
import java.net.Socket; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.SocketChannel; | |
import java.nio.channels.spi.SelectorProvider; | |
import java.util.*; | |
public class NioClient implements Runnable { | |
// The host:port combination to connect to | |
private InetAddress hostAddress; | |
private int port; | |
// The selector we'll be monitoring | |
private Selector selector; | |
// The buffer into which we'll read data when it's available | |
private ByteBuffer readBuffer = ByteBuffer.allocate(8192); | |
// A list of PendingChange instances | |
private List pendingChanges = new LinkedList(); | |
// Maps a SocketChannel to a list of ByteBuffer instances | |
private Map pendingData = new HashMap(); | |
// Maps a SocketChannel to a RspHandler | |
private Map rspHandlers = Collections.synchronizedMap(new HashMap()); | |
public NioClient(InetAddress hostAddress, int port) throws IOException { | |
this.hostAddress = hostAddress; | |
this.port = port; | |
this.selector = this.initSelector(); | |
} | |
public void send(byte[] data, RspHandler handler) throws IOException { | |
// Start a new connection | |
SocketChannel socket = this.initiateConnection(); | |
// Register the response handler | |
this.rspHandlers.put(socket, handler); | |
// And queue the data we want written | |
synchronized (this.pendingData) { | |
List queue = (List) this.pendingData.get(socket); | |
if (queue == null) { | |
queue = new ArrayList(); | |
this.pendingData.put(socket, queue); | |
} | |
queue.add(ByteBuffer.wrap(data)); | |
} | |
// Finally, wake up our selecting thread so it can make the required changes | |
this.selector.wakeup(); | |
} | |
public void run() { | |
while (true) { | |
try { | |
// Process any pending changes | |
synchronized (this.pendingChanges) { | |
Iterator changes = this.pendingChanges.iterator(); | |
while (changes.hasNext()) { | |
ChangeRequest change = (ChangeRequest) changes.next(); | |
switch (change.type) { | |
case ChangeRequest.CHANGEOPS: | |
SelectionKey key = change.socket.keyFor(this.selector); | |
key.interestOps(change.ops); | |
break; | |
case ChangeRequest.REGISTER: | |
change.socket.register(this.selector, change.ops); | |
break; | |
} | |
} | |
this.pendingChanges.clear(); | |
} | |
// Wait for an event one of the registered channels | |
this.selector.select(100); | |
// Iterate over the set of keys for which events are available | |
Iterator selectedKeys = this.selector.selectedKeys().iterator(); | |
while (selectedKeys.hasNext()) { | |
SelectionKey key = (SelectionKey) selectedKeys.next(); | |
selectedKeys.remove(); | |
if (!key.isValid()) { | |
continue; | |
} | |
// Check what event is available and deal with it | |
if (key.isConnectable()) { | |
this.finishConnection(key); | |
} else if (key.isReadable()) { | |
this.read(key); | |
} else if (key.isWritable()) { | |
this.write(key); | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
private void read(SelectionKey key) throws IOException { | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
// Clear out our read buffer so it's ready for new data | |
this.readBuffer.clear(); | |
// Attempt to read off the channel | |
int numRead; | |
try { | |
numRead = socketChannel.read(this.readBuffer); | |
} catch (IOException e) { | |
// The remote forcibly closed the connection, cancel | |
// the selection key and close the channel. | |
key.cancel(); | |
socketChannel.close(); | |
return; | |
} | |
if (numRead == -1) { | |
// Remote entity shut the socket down cleanly. Do the | |
// same from our end and cancel the channel. | |
key.channel().close(); | |
key.cancel(); | |
return; | |
} | |
// Handle the response | |
this.handleResponse(socketChannel, this.readBuffer.array(), numRead); | |
} | |
private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException { | |
// Make a correctly sized copy of the data before handing it | |
// to the client | |
byte[] rspData = new byte[numRead]; | |
System.arraycopy(data, 0, rspData, 0, numRead); | |
// Look up the handler for this channel | |
RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel); | |
// And pass the response to it | |
if (handler.handleResponse(rspData)) { | |
// The handler has seen enough, close the connection | |
socketChannel.close(); | |
socketChannel.keyFor(this.selector).cancel(); | |
} | |
} | |
private void write(SelectionKey key) throws IOException { | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
synchronized (this.pendingData) { | |
List queue = (List) this.pendingData.get(socketChannel); | |
// Write until there's not more data ... | |
while (!queue.isEmpty()) { | |
ByteBuffer buf = (ByteBuffer) queue.get(0); | |
socketChannel.write(buf); | |
if (buf.remaining() > 0) { | |
// ... or the socket's buffer fills up | |
break; | |
} | |
queue.remove(0); | |
} | |
if (queue.isEmpty()) { | |
// We wrote away all data, so we're no longer interested | |
// in writing on this socket. Switch back to waiting for | |
// data. | |
key.interestOps(SelectionKey.OP_READ); | |
} | |
} | |
} | |
private void finishConnection(SelectionKey key) throws IOException { | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
// Finish the connection. If the connection operation failed | |
// this will raise an IOException. | |
try { | |
socketChannel.finishConnect(); | |
} catch (IOException e) { | |
// Cancel the channel's registration with our selector | |
System.out.println(e); | |
key.cancel(); | |
return; | |
} | |
// Register an interest in writing on this channel | |
key.interestOps(SelectionKey.OP_WRITE); | |
} | |
private SocketChannel initiateConnection() throws IOException { | |
// Create a non-blocking socket channel | |
SocketChannel socketChannel = SocketChannel.open(); | |
socketChannel.configureBlocking(false); | |
// Kick off connection establishment | |
socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port)); | |
// Queue a channel registration since the caller is not the | |
// selecting thread. As part of the registration we'll register | |
// an interest in connection events. These are raised when a channel | |
// is ready to complete connection establishment. | |
synchronized(this.pendingChanges) { | |
this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT)); | |
} | |
return socketChannel; | |
} | |
private Selector initSelector() throws IOException { | |
// Create a new selector | |
return SelectorProvider.provider().openSelector(); | |
} | |
public static void main(String[] args) { | |
try { | |
NioClient client = new NioClient(InetAddress.getByName("localhost"), 9090); | |
Thread t = new Thread(client); | |
t.setDaemon(true); | |
t.start(); | |
RspHandler handler = new RspHandler(); | |
client.send("GET / HTTP/1.0\r\n\r\n".getBytes(), handler); | |
handler.waitForResponse(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.net.InetAddress; | |
import java.net.InetSocketAddress; | |
import java.net.Socket; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.nio.channels.spi.SelectorProvider; | |
import java.util.*; | |
public class NioServer implements Runnable { | |
// The host:port combination to listen on | |
private InetAddress hostAddress; | |
private int port; | |
// The channel on which we'll accept connections | |
private ServerSocketChannel serverChannel; | |
// The selector we'll be monitoring | |
private Selector selector; | |
// The buffer into which we'll read data when it's available | |
private ByteBuffer readBuffer = ByteBuffer.allocate(8192); | |
private EchoWorker worker; | |
// A list of PendingChange instances | |
private List pendingChanges = new LinkedList(); | |
// Maps a SocketChannel to a list of ByteBuffer instances | |
private Map pendingData = new HashMap(); | |
public NioServer(InetAddress hostAddress, int port, EchoWorker worker) throws IOException { | |
this.hostAddress = hostAddress; | |
this.port = port; | |
this.selector = this.initSelector(); | |
this.worker = worker; | |
} | |
public void send(SocketChannel socket, byte[] data) { | |
synchronized (this.pendingChanges) { | |
// Indicate we want the interest ops set changed | |
this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE)); | |
// And queue the data we want written | |
synchronized (this.pendingData) { | |
List queue = (List) this.pendingData.get(socket); | |
if (queue == null) { | |
queue = new ArrayList(); | |
this.pendingData.put(socket, queue); | |
} | |
queue.add(ByteBuffer.wrap(data)); | |
} | |
} | |
// Finally, wake up our selecting thread so it can make the required changes | |
this.selector.wakeup(); | |
} | |
public void run() { | |
while (true) { | |
try { | |
// Process any pending changes | |
synchronized (this.pendingChanges) { | |
Iterator changes = this.pendingChanges.iterator(); | |
while (changes.hasNext()) { | |
ChangeRequest change = (ChangeRequest) changes.next(); | |
switch (change.type) { | |
case ChangeRequest.CHANGEOPS: | |
SelectionKey key = change.socket.keyFor(this.selector); | |
if (key != null) { | |
key.interestOps(change.ops); | |
} | |
} | |
} | |
this.pendingChanges.clear(); | |
} | |
// Wait for an event one of the registered channels | |
this.selector.select(100); | |
// Iterate over the set of keys for which events are available | |
Iterator selectedKeys = this.selector.selectedKeys().iterator(); | |
while (selectedKeys.hasNext()) { | |
SelectionKey key = (SelectionKey) selectedKeys.next(); | |
selectedKeys.remove(); | |
if (!key.isValid()) { | |
continue; | |
} | |
// Check what event is available and deal with it | |
if (key.isAcceptable()) { | |
this.accept(key); | |
} else if (key.isReadable()) { | |
this.read(key); | |
} else if (key.isWritable()) { | |
this.write(key); | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
private void accept(SelectionKey key) throws IOException { | |
// For an accept to be pending the channel must be a server socket channel. | |
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); | |
// Accept the connection and make it non-blocking | |
SocketChannel socketChannel = serverSocketChannel.accept(); | |
Socket socket = socketChannel.socket(); | |
socketChannel.configureBlocking(false); | |
// Register the new SocketChannel with our Selector, indicating | |
// we'd like to be notified when there's data waiting to be read | |
socketChannel.register(this.selector, SelectionKey.OP_READ); | |
} | |
private void read(SelectionKey key) throws IOException { | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
// Clear out our read buffer so it's ready for new data | |
this.readBuffer.clear(); | |
// Attempt to read off the channel | |
int numRead; | |
try { | |
numRead = socketChannel.read(this.readBuffer); | |
} catch (IOException e) { | |
// The remote forcibly closed the connection, cancel | |
// the selection key and close the channel. | |
key.cancel(); | |
socketChannel.close(); | |
return; | |
} | |
if (numRead == -1) { | |
// Remote entity shut the socket down cleanly. Do the | |
// same from our end and cancel the channel. | |
key.channel().close(); | |
key.cancel(); | |
return; | |
} | |
// Hand the data off to our worker thread | |
this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead); | |
} | |
private void write(SelectionKey key) throws IOException { | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
synchronized (this.pendingData) { | |
List queue = (List) this.pendingData.get(socketChannel); | |
// Write until there's not more data ... | |
while (!queue.isEmpty()) { | |
ByteBuffer buf = (ByteBuffer) queue.get(0); | |
socketChannel.write(buf); | |
if (buf.remaining() > 0) { | |
// ... or the socket's buffer fills up | |
break; | |
} | |
queue.remove(0); | |
} | |
if (queue.isEmpty()) { | |
// We wrote away all data, so we're no longer interested | |
// in writing on this socket. Switch back to waiting for | |
// data. | |
key.interestOps(SelectionKey.OP_READ); | |
} | |
} | |
} | |
private Selector initSelector() throws IOException { | |
// Create a new selector | |
Selector socketSelector = SelectorProvider.provider().openSelector(); | |
// Create a new non-blocking server socket channel | |
this.serverChannel = ServerSocketChannel.open(); | |
serverChannel.configureBlocking(false); | |
// Bind the server socket to the specified address and port | |
InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); | |
serverChannel.socket().bind(isa); | |
// Register the server socket channel, indicating an interest in | |
// accepting new connections | |
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); | |
return socketSelector; | |
} | |
public static void main(String[] args) { | |
try { | |
EchoWorker worker = new EchoWorker(); | |
new Thread(worker).start(); | |
new Thread(new NioServer(null, 9090, worker)).start(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
make client |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RspHandler { | |
private byte[] rsp = null; | |
public synchronized boolean handleResponse(byte[] rsp) { | |
this.rsp = rsp; | |
this.notify(); | |
return true; | |
} | |
public synchronized void waitForResponse() { | |
while(this.rsp == null) { | |
try { | |
this.wait(); | |
} catch (InterruptedException e) { | |
} | |
} | |
System.out.println(new String(this.rsp)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.channels.SocketChannel; | |
class ServerDataEvent { | |
public NioServer server; | |
public SocketChannel socket; | |
public byte[] data; | |
public ServerDataEvent(NioServer server, SocketChannel socket, byte[] data) { | |
this.server = server; | |
this.socket = socket; | |
this.data = data; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment