Skip to content

Instantly share code, notes, and snippets.

@c-rainstorm
Last active September 16, 2020 14:52
Show Gist options
  • Save c-rainstorm/e1bc86bea2ec971d3b09d8412814dda8 to your computer and use it in GitHub Desktop.
Save c-rainstorm/e1bc86bea2ec971d3b09d8412814dda8 to your computer and use it in GitHub Desktop.
package me.rainstorm.playground.reactor;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author baochen1.zhang
* @date 2020.09.16
*/
public class MultiReactorWithMultiWorkThread extends ReactorWithMultiWorkThread {
private final Selector[] selectors;
ExecutorService subReactorService;
public MultiReactorWithMultiWorkThread(int subReactorNum, int port) throws IOException {
super(port);
selectors = new Selector[subReactorNum];
subReactorService = Executors.newFixedThreadPool(subReactorNum);
for (int i = 0; i < subReactorNum; ++i) {
final Selector sel = Selector.open();
selectors[i] = sel;
subReactorService.submit(() -> selectLoop(sel));
}
}
@Override
protected Acceptor createAcceptor() {
return new RoundRobinAcceptor();
}
class RoundRobinAcceptor extends Acceptor {
private final AtomicInteger selectorRotater = new AtomicInteger(0);
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {
createHandler(getSelector0(), c);
}
} catch (IOException ex) { /* ... */ }
}
public Selector getSelector0() {
int idx = Math.abs(selectorRotater.getAndIncrement()) % selectors.length;
return selectors[idx];
}
}
}
package me.rainstorm.playground.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
/**
* http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
*
* @author Doug Lea
*/
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
public Reactor(int port) throws IOException {
// select/poll/epoll
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
// socket()
serverSocket.socket()
// bind()
.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
// listen
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(createAcceptor());
}
protected Acceptor createAcceptor() {
return new Acceptor();
}
@Override
public void run() {
selectLoop(selector);
}
protected void selectLoop(Selector sel) {
try {
while (!Thread.interrupted()) {
sel.select();
Set<SelectionKey> selected = sel.selectedKeys();
for (SelectionKey selectionKey : selected) {
dispatch(selectionKey);
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
r.run();
}
}
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {
createHandler(selector, c);
}
} catch (IOException ex) { /* ... */ }
}
}
protected Handler createHandler(Selector selector, SocketChannel c) throws IOException {
return new Handler(selector, c);
}
static class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1000);
ByteBuffer output = ByteBuffer.allocate(1000);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
return true;
}
boolean outputIsComplete() {
return true;
}
void process() {
}
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) {
sk.cancel();
}
}
}
}
package me.rainstorm.playground.reactor;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
*
* @author Doug Lea
*/
public class ReactorWithMultiWorkThread extends Reactor {
public ReactorWithMultiWorkThread(int port) throws IOException {
super(port);
}
@Override
protected Handler createHandler(Selector selector, SocketChannel c) throws IOException {
return new HandlerWithThreadPool(selector, c);
}
static class HandlerWithThreadPool extends Reactor.Handler {
// uses util.concurrent thread pool
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 2);
static final int PROCESSING = 3;
HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException {
super(sel, c);
}
@Override
synchronized void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
@Override
public void run() {
processAndHandOff();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment