Last active
September 16, 2020 14:52
-
-
Save c-rainstorm/e1bc86bea2ec971d3b09d8412814dda8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.rainstorm.playground.reactor; | |
import java.io.IOException; | |
import java.nio.channels.Selector; | |
import java.nio.channels.SocketChannel; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* @author baochen1.zhang | |
* @date 2020.09.16 | |
*/ | |
public class MultiReactorWithMultiWorkThread extends ReactorWithMultiWorkThread { | |
private final Selector[] selectors; | |
ExecutorService subReactorService; | |
public MultiReactorWithMultiWorkThread(int subReactorNum, int port) throws IOException { | |
super(port); | |
selectors = new Selector[subReactorNum]; | |
subReactorService = Executors.newFixedThreadPool(subReactorNum); | |
for (int i = 0; i < subReactorNum; ++i) { | |
final Selector sel = Selector.open(); | |
selectors[i] = sel; | |
subReactorService.submit(() -> selectLoop(sel)); | |
} | |
} | |
@Override | |
protected Acceptor createAcceptor() { | |
return new RoundRobinAcceptor(); | |
} | |
class RoundRobinAcceptor extends Acceptor { | |
private final AtomicInteger selectorRotater = new AtomicInteger(0); | |
@Override | |
public void run() { | |
try { | |
SocketChannel c = serverSocket.accept(); | |
if (c != null) { | |
createHandler(getSelector0(), c); | |
} | |
} catch (IOException ex) { /* ... */ } | |
} | |
public Selector getSelector0() { | |
int idx = Math.abs(selectorRotater.getAndIncrement()) % selectors.length; | |
return selectors[idx]; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.rainstorm.playground.reactor; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.util.Set; | |
/** | |
* http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf | |
* | |
* @author Doug Lea | |
*/ | |
public class Reactor implements Runnable { | |
final Selector selector; | |
final ServerSocketChannel serverSocket; | |
public Reactor(int port) throws IOException { | |
// select/poll/epoll | |
selector = Selector.open(); | |
serverSocket = ServerSocketChannel.open(); | |
// socket() | |
serverSocket.socket() | |
// bind() | |
.bind(new InetSocketAddress(port)); | |
serverSocket.configureBlocking(false); | |
// listen | |
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); | |
sk.attach(createAcceptor()); | |
} | |
protected Acceptor createAcceptor() { | |
return new Acceptor(); | |
} | |
@Override | |
public void run() { | |
selectLoop(selector); | |
} | |
protected void selectLoop(Selector sel) { | |
try { | |
while (!Thread.interrupted()) { | |
sel.select(); | |
Set<SelectionKey> selected = sel.selectedKeys(); | |
for (SelectionKey selectionKey : selected) { | |
dispatch(selectionKey); | |
} | |
selected.clear(); | |
} | |
} catch (IOException ex) { /* ... */ } | |
} | |
void dispatch(SelectionKey k) { | |
Runnable r = (Runnable) (k.attachment()); | |
if (r != null) { | |
r.run(); | |
} | |
} | |
class Acceptor implements Runnable { | |
@Override | |
public void run() { | |
try { | |
SocketChannel c = serverSocket.accept(); | |
if (c != null) { | |
createHandler(selector, c); | |
} | |
} catch (IOException ex) { /* ... */ } | |
} | |
} | |
protected Handler createHandler(Selector selector, SocketChannel c) throws IOException { | |
return new Handler(selector, c); | |
} | |
static class Handler implements Runnable { | |
final SocketChannel socket; | |
final SelectionKey sk; | |
ByteBuffer input = ByteBuffer.allocate(1000); | |
ByteBuffer output = ByteBuffer.allocate(1000); | |
static final int READING = 0, SENDING = 1; | |
int state = READING; | |
Handler(Selector sel, SocketChannel c) | |
throws IOException { | |
socket = c; | |
c.configureBlocking(false); | |
sk = socket.register(sel, 0); | |
sk.attach(this); | |
sk.interestOps(SelectionKey.OP_READ); | |
sel.wakeup(); | |
} | |
boolean inputIsComplete() { | |
return true; | |
} | |
boolean outputIsComplete() { | |
return true; | |
} | |
void process() { | |
} | |
@Override | |
public void run() { | |
try { | |
if (state == READING) { | |
read(); | |
} else if (state == SENDING) { | |
send(); | |
} | |
} catch (IOException ex) { /* ... */ } | |
} | |
void read() throws IOException { | |
socket.read(input); | |
if (inputIsComplete()) { | |
process(); | |
state = SENDING; | |
// Normally also do first write now | |
sk.interestOps(SelectionKey.OP_WRITE); | |
} | |
} | |
void send() throws IOException { | |
socket.write(output); | |
if (outputIsComplete()) { | |
sk.cancel(); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.rainstorm.playground.reactor; | |
import java.io.IOException; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.SocketChannel; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
/** | |
* http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf | |
* | |
* @author Doug Lea | |
*/ | |
public class ReactorWithMultiWorkThread extends Reactor { | |
public ReactorWithMultiWorkThread(int port) throws IOException { | |
super(port); | |
} | |
@Override | |
protected Handler createHandler(Selector selector, SocketChannel c) throws IOException { | |
return new HandlerWithThreadPool(selector, c); | |
} | |
static class HandlerWithThreadPool extends Reactor.Handler { | |
// uses util.concurrent thread pool | |
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 2); | |
static final int PROCESSING = 3; | |
HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException { | |
super(sel, c); | |
} | |
@Override | |
synchronized void read() throws IOException { | |
socket.read(input); | |
if (inputIsComplete()) { | |
state = PROCESSING; | |
pool.execute(new Processer()); | |
} | |
} | |
synchronized void processAndHandOff() { | |
process(); | |
state = SENDING; | |
sk.interestOps(SelectionKey.OP_WRITE); | |
} | |
class Processer implements Runnable { | |
@Override | |
public void run() { | |
processAndHandOff(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment