Last active
December 14, 2015 14:59
-
-
Save ojacobson/5104710 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.onepointeight; | |
import java.io.IOException; | |
import java.nio.channels.SocketChannel; | |
/** | |
* Invoked when a new connection is available on a channel. | |
*/ | |
public interface AcceptCallback { | |
public void accepted(SocketChannel channel) throws IOException; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.onepointeight; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.*; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
public class Reactor { | |
public static void main(String[] args) throws IOException { | |
Reactor.run((reactor) -> { | |
reactor.listen(3000, (client) -> { | |
reactor.read(client, (data) -> { | |
data.flip(); | |
reactor.write(client, data); | |
}); | |
}); | |
}); | |
} | |
private final Map<SelectableChannel, SelectionKey> keys = new HashMap<>(); | |
private final Map<SelectionKey, AcceptCallback> listeners = new HashMap<>(); | |
private final Map<SelectionKey, ReadableCallback> readers = new HashMap<>(); | |
private final Selector selector; | |
public static void run(ReactorCallback program) throws IOException { | |
Reactor r = new Reactor(); | |
r.start(program); | |
} | |
private Reactor() throws IOException { | |
selector = Selector.open(); | |
} | |
public void write(SocketChannel client, ByteBuffer data) throws IOException { | |
client.write(data); | |
} | |
public void read(SocketChannel client, ReadableCallback reader) { | |
SelectionKey key = keys.get(client); | |
int interests = key.interestOps(); | |
key.interestOps(interests | SelectionKey.OP_READ); | |
readers.put(key, reader); | |
} | |
public void listen(int port, AcceptCallback acceptor) throws IOException { | |
ServerSocketChannel server = ServerSocketChannel.open(); | |
server.configureBlocking(false); | |
server.bind(new InetSocketAddress(port)); | |
SelectionKey listenKey = server.register(selector, SelectionKey.OP_ACCEPT); | |
listeners.put(listenKey, acceptor); | |
keys.put(server, listenKey); | |
} | |
public void close(Channel channel) throws IOException { | |
SelectionKey key = keys.get(channel); | |
if (key != null) { | |
listeners.remove(key); | |
readers.remove(key); | |
keys.remove(channel); | |
key.cancel(); | |
channel.close(); | |
} else | |
throw new IllegalArgumentException(); | |
} | |
public void start(ReactorCallback bootstrap) throws IOException { | |
bootstrap.invoke(this); | |
while (!keys.isEmpty()) | |
react(); | |
} | |
private void react() throws IOException { | |
selector.select(); | |
for (Iterator<SelectionKey> keysIter = selector.selectedKeys().iterator(); | |
keysIter.hasNext(); ) { | |
SelectionKey key = keysIter.next(); | |
keysIter.remove(); | |
maybeAccept(key); | |
maybeRead(key); | |
} | |
} | |
private void maybeRead(SelectionKey key) throws IOException { | |
ReadableCallback readableCallback = readers.get(key); | |
if (key.isReadable() && readableCallback != null) { | |
SocketChannel channel = (SocketChannel) key.channel(); | |
ByteBuffer readBuffer = ByteBuffer.allocate(4096); | |
int bytesRead = channel.read(readBuffer); | |
if (bytesRead >= 0) | |
readableCallback.dataRead(readBuffer); | |
else | |
close(channel); | |
} | |
} | |
private void maybeAccept(SelectionKey key) throws IOException { | |
AcceptCallback acceptCallback = listeners.get(key); | |
if (key.isAcceptable() && acceptCallback != null) { | |
SocketChannel accepted = ((ServerSocketChannel) key.channel()).accept(); | |
accepted.configureBlocking(false); | |
SelectionKey acceptedKey = accepted.register(selector, 0); | |
keys.put(accepted, acceptedKey); | |
acceptCallback.accepted(accepted); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.onepointeight; | |
import java.io.IOException; | |
public interface ReactorCallback { | |
public void invoke(Reactor reactor) throws IOException; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.onepointeight; | |
import java.io.IOException; | |
import java.nio.ByteBuffer; | |
public interface ReadableCallback { | |
public void dataRead(ByteBuffer data) throws IOException; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment