Skip to content

Instantly share code, notes, and snippets.

@junwen12221
Created December 1, 2016 06:14
Show Gist options
  • Save junwen12221/36d6f73e7860e8f2a06e5a2476963ede to your computer and use it in GitHub Desktop.
Save junwen12221/36d6f73e7860e8f2a06e5a2476963ede to your computer and use it in GitHub Desktop.
package twelve;
import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by Administrator on 2016/11/29 0029.
*/
public abstract class IOHandler implements Runnable {
protected final SelectionKey selectionKey;
protected final SocketChannel socketChannel;
private volatile ByteBuffer writeBuffer;
private volatile LinkedList<ByteBuffer> writeQueue = new LinkedList<>();
private AtomicBoolean writingFlag = new AtomicBoolean(false);
protected volatile ByteBuffer readBuffer;
public IOHandler(final Selector selector, SocketChannel socketChannel) throws IOException {
socketChannel.configureBlocking(false);
this.socketChannel = socketChannel;
selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
readBuffer = ByteBuffer.allocateDirect(100);
//绑定会话
selectionKey.attach(this);
this.onConnected();
}
public abstract void onConnected() throws IOException;
public abstract void doHandler() throws IOException;
public void writeData(byte[] data) throws IOException {
while (!writingFlag.compareAndSet(false, true)) {
//wait until release
}
try {
ByteBuffer theWriteBuf = writeBuffer;
if (theWriteBuf == null && writeQueue.isEmpty()) {
writeToChannel(ByteBuffer.wrap(data));
} else {
writeQueue.add(ByteBuffer.wrap(data));
writeToChannel(theWriteBuf);
}
} finally {
writingFlag.lazySet(false);
}
}
@Override
public void run() {
try {
if (selectionKey.isReadable()) {
doHandler();
} else if (selectionKey.isWritable()) {
doWriteData();
}
} catch (Exception e) {
e.printStackTrace();
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e1) {
e.printStackTrace();
}
}
}
private void doWriteData() throws IOException {
try {
while (!writingFlag.compareAndSet(false, true)) {
//wait until release
}
ByteBuffer theWrtieBuf = writeBuffer;
writeToChannel(theWrtieBuf);
} finally {
writingFlag.lazySet(false);
}
}
private void writeToChannel(ByteBuffer byteBuffer) throws IOException {
int writed = socketChannel.write(byteBuffer);
System.out.println("write " + writed);
if (byteBuffer.hasRemaining()) {
System.out.println("writed " + writed + " not write finished ,remains " + byteBuffer.remaining());
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
} else {
System.out.println(" block write finished ");
writeBuffer = null;
if (writeQueue.isEmpty()) {
System.out.println(" .... write finished ,no more data ");
selectionKey.interestOps((selectionKey.interestOps() & ~SelectionKey.OP_WRITE) | SelectionKey.OP_READ);
} else {
ByteBuffer buf = writeQueue.removeFirst();
buf.flip();
writeToChannel(buf);
}
}
}
}
package twelve;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.regex.Pattern;
/**
* Created by Administrator on 2016/11/29 0029.
*/
public class LocalCmandUtil {
public static String callCmdAndgetResult(String cmd) {
StringBuilder result = new StringBuilder();
try {
ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s"));
Process process = pb.start();
try (InputStream is = process.getInputStream();
InputStreamReader isr = new InputStreamReader(is, "GBK");
BufferedReader br = new BufferedReader(isr)) {
String line;
while ((line = br.readLine()) != null) {
result.append(line).append("\r\n");
}
process.waitFor();
}
} catch (Exception e) {
result.append(e.toString());
}
return result.toString();
}
}
package twelve;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by Administrator on 2016/11/29 0029.
*/
public class Main {
public static void main(String[] args) throws IOException{
ExecutorService executor = Executors.newCachedThreadPool();
MyNIOReactor[] reactors = new MyNIOReactor[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < reactors.length; i++) {
reactors[i]=new MyNIOReactor(executor);
reactors[i].start();
}
NIOAcceptor acceptor=new NIOAcceptor(23, reactors);
acceptor.start();
}
}
package twelve;
import com.sun.org.apache.bcel.internal.generic.Select;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
* Created by Administrator on 2016/11/29 0029.
*/
public class MyNIOReactor extends Thread {
final Selector selector;
final ExecutorService executor;
public MyNIOReactor(ExecutorService executor) throws IOException {
this.selector = Selector.open();
this.executor = executor;
}
public void registerNewClient(SocketChannel socketChannel) throws IOException {
System.out.println(" registered by actor " + this.getName());
new TelnetIOHandler(selector, socketChannel);
}
@Override
public void run() {
while (true) {
Set<SelectionKey> selectionKeys;
try {
selector.select(500);
selectionKeys = selector.selectedKeys();
} catch (Exception e) {
e.printStackTrace();
continue;
}
for (SelectionKey selectedKey : selectionKeys) {
IOHandler ioHandler = (IOHandler) selectedKey.attachment();
this.executor.execute(ioHandler);
}
selectionKeys.clear();
}
}
}
package twelve;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
/**
* Created by Administrator on 2016/11/29 0029.
*/
public class NIOAcceptor extends Thread {
private final ServerSocketChannel serverSocketChannel;
private final MyNIOReactor[] reactors;
private final Selector selector;
public NIOAcceptor(int bindPort, MyNIOReactor[] reactors) throws IOException {
this.reactors = reactors;
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
InetSocketAddress address = new InetSocketAddress(bindPort);
serverSocketChannel.socket().bind(address);
System.out.println("started at " + address);
}
@Override
public void run() {
while (true) {
Set<SelectionKey> selectionKeys;
try {
selector.select(500);
selectionKeys = selector.selectedKeys();
} catch (Exception e) {
e.printStackTrace();
continue;
}
for (SelectionKey selectedKey : selectionKeys) {
if (selectedKey.isAcceptable()) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("Connection Accepted " + socketChannel.getRemoteAddress());
int nextReactor = ThreadLocalRandom.current().nextInt(0, reactors.length);
reactors[nextReactor].registerNewClient(socketChannel);
} catch (IOException e) {
e.printStackTrace();
}
} else {
}
}
selectionKeys.clear();
}
}
}
package twelve;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
/**
* Created by Administrator on 2016/11/29 0029.
*/
public class TelnetIOHandler extends IOHandler {
private int lastMessagePos;
private static byte[] welcomeInfo = "Welcome Karakapi ship...\r\n".concat("1: find keyword in files\r\n" +
"2: quit\r\nTelnet>").getBytes();
public TelnetIOHandler(Selector selector, SocketChannel socketChannel) throws IOException {
super(selector, socketChannel);
}
@Override
public void onConnected() throws IOException {
System.out.println("connected from " + this.socketChannel.getRemoteAddress());
this.writeData(welcomeInfo);
}
@Override
public void doHandler() throws IOException {
socketChannel.read(readBuffer);
int readEndPos = readBuffer.position();
String readedLine = null;
byte b = 0;
for (int i = lastMessagePos; i < readEndPos; i++) {
b = readBuffer.get(i);
if (b == 13) {
byte[] lineBytes = new byte[i - lastMessagePos];
readBuffer.position(lastMessagePos);
readBuffer.get(lineBytes);
lastMessagePos = i;
readedLine = new String(lineBytes);
System.out.println("received line ,lenth:" + readedLine.length() + " value " + readedLine);
break;
}
}
System.out.println(readedLine);
if (readedLine != null) {
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ);
processCommand(readedLine);
}
if (readBuffer.position() > readBuffer.capacity() / 2) {
System.out.println(" rewind read byte buffer ,get more space " + readBuffer.position());
readBuffer.limit(readBuffer.position());
readBuffer.position(lastMessagePos);
readBuffer.compact();
lastMessagePos = 0;
}
}
private void processCommand(String readedLine) throws IOException {
byte[] data = null;
switch (readedLine) {
case "quit": {
this.selectionKey.cancel();
this.socketChannel.close();
return;
}
}
if (readedLine.startsWith("dir")) {
readedLine = "cmd /c " + readedLine;
data = (LocalCmandUtil.callCmdAndgetResult(readedLine) + "\r\nTelnet>").getBytes("GBK");
} else {
data = new byte[1024 * 10];
ByteBuffer tempBuf = ByteBuffer.wrap(data);
for (int i = 0; i < tempBuf.capacity() - 10; i++) {
tempBuf.put((byte) ('a' + i % 25));
}
tempBuf.put("\r\nTelnet>".getBytes());
}
this.writeData(data);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment