Skip to content

Instantly share code, notes, and snippets.

@ojacobson
Last active December 14, 2015 14:59
Show Gist options
  • Save ojacobson/5104710 to your computer and use it in GitHub Desktop.
Save ojacobson/5104710 to your computer and use it in GitHub Desktop.
package com.example.onepointeight;
import java.io.IOException;
import java.nio.channels.SocketChannel;
/**
* Invoked when a new connection is available on a channel.
*/
public interface AcceptCallback {
public void accepted(SocketChannel channel) throws IOException;
}
package com.example.onepointeight;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class Reactor {
public static void main(String[] args) throws IOException {
Reactor.run((reactor) -> {
reactor.listen(3000, (client) -> {
reactor.read(client, (data) -> {
data.flip();
reactor.write(client, data);
});
});
});
}
private final Map<SelectableChannel, SelectionKey> keys = new HashMap<>();
private final Map<SelectionKey, AcceptCallback> listeners = new HashMap<>();
private final Map<SelectionKey, ReadableCallback> readers = new HashMap<>();
private final Selector selector;
public static void run(ReactorCallback program) throws IOException {
Reactor r = new Reactor();
r.start(program);
}
private Reactor() throws IOException {
selector = Selector.open();
}
public void write(SocketChannel client, ByteBuffer data) throws IOException {
client.write(data);
}
public void read(SocketChannel client, ReadableCallback reader) {
SelectionKey key = keys.get(client);
int interests = key.interestOps();
key.interestOps(interests | SelectionKey.OP_READ);
readers.put(key, reader);
}
public void listen(int port, AcceptCallback acceptor) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
SelectionKey listenKey = server.register(selector, SelectionKey.OP_ACCEPT);
listeners.put(listenKey, acceptor);
keys.put(server, listenKey);
}
public void close(Channel channel) throws IOException {
SelectionKey key = keys.get(channel);
if (key != null) {
listeners.remove(key);
readers.remove(key);
keys.remove(channel);
key.cancel();
channel.close();
} else
throw new IllegalArgumentException();
}
public void start(ReactorCallback bootstrap) throws IOException {
bootstrap.invoke(this);
while (!keys.isEmpty())
react();
}
private void react() throws IOException {
selector.select();
for (Iterator<SelectionKey> keysIter = selector.selectedKeys().iterator();
keysIter.hasNext(); ) {
SelectionKey key = keysIter.next();
keysIter.remove();
maybeAccept(key);
maybeRead(key);
}
}
private void maybeRead(SelectionKey key) throws IOException {
ReadableCallback readableCallback = readers.get(key);
if (key.isReadable() && readableCallback != null) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(4096);
int bytesRead = channel.read(readBuffer);
if (bytesRead >= 0)
readableCallback.dataRead(readBuffer);
else
close(channel);
}
}
private void maybeAccept(SelectionKey key) throws IOException {
AcceptCallback acceptCallback = listeners.get(key);
if (key.isAcceptable() && acceptCallback != null) {
SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept();
accepted.configureBlocking(false);
SelectionKey acceptedKey = accepted.register(selector, 0);
keys.put(accepted, acceptedKey);
acceptCallback.accepted(accepted);
}
}
}
package com.example.onepointeight;
import java.io.IOException;
public interface ReactorCallback {
public void invoke(Reactor reactor) throws IOException;
}
package com.example.onepointeight;
import java.io.IOException;
import java.nio.ByteBuffer;
public interface ReadableCallback {
public void dataRead(ByteBuffer data) throws IOException;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment