Last active
November 26, 2017 11:51
-
-
Save wangzaixiang/352f9dbc58b90135d3415e2b4a387fce to your computer and use it in GitHub Desktop.
这是一个很好的selector例子,也发现了一个bug,当连接一次后, selector会进入不等待死循环。原因是 accept后注册了 OP_CONNECT错误,但这个导致错误返回是否应该算JDK的一个bug呢?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.demo; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.*; | |
import java.util.Iterator; | |
/** | |
* Created by ever on 2017/11/24. | |
*/ | |
public class SelectorDemo { | |
private static final int BUF_SIZE = 1024; | |
private static final int PORT = 8070; | |
private static final int TIMEOUT = 3000; | |
public static void main(String[] args) throws IOException, InterruptedException { | |
selector(); | |
} | |
/** | |
* TODO | |
* 多线程提高处理能力 | |
* @throws IOException | |
* @throws InterruptedException | |
*/ | |
private static void selector() throws IOException, InterruptedException { | |
ServerSocketChannel ssc = ServerSocketChannel.open(); | |
Selector selector = Selector.open(); | |
ssc.socket().bind(new InetSocketAddress(PORT)); | |
ssc.configureBlocking(false); | |
ssc.register(selector, SelectionKey.OP_ACCEPT); | |
while (true) { | |
if (selector.select(TIMEOUT) == 0) { | |
System.out.println("==="); | |
continue; | |
} | |
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); | |
while (iter.hasNext()) { | |
SelectionKey key = iter.next(); | |
if (key.isAcceptable()) { | |
handleAccept(key); | |
} | |
if (key.isConnectable()) { | |
handleConnect(key); | |
} | |
if (key.isWritable() && key.isValid()) { | |
handleWrite(key); | |
} | |
if (key.isReadable()) { | |
handleRead(key); | |
} | |
iter.remove(); | |
} | |
} | |
} | |
private static void handleAccept(SelectionKey key) throws IOException { | |
System.out.println("Handle accept event"); | |
ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); | |
SocketChannel sc = ssc.accept(); | |
sc.configureBlocking(false); | |
//CONNECT事件会造成客户端(telnet)连接上来后,select()方法行为怪异 | |
sc.register(key.selector(), SelectionKey.OP_CONNECT);//OP_READ | |
} | |
/** | |
* 模拟大数据量异步写入 | |
* @param key | |
* @throws IOException | |
* @throws InterruptedException | |
*/ | |
private static void handleWrite(SelectionKey key) throws IOException, InterruptedException { | |
System.out.println("Handle write event"); | |
SocketChannel sc = (SocketChannel) key.channel(); | |
ByteBuffer buffer = (ByteBuffer) key.attachment(); | |
int writes = sc.write(buffer); | |
System.out.println("write bytes:" + writes); | |
System.out.println("remain:" + buffer.remaining()); | |
if (buffer.remaining() == 0) { | |
key.interestOps(SelectionKey.OP_READ); | |
} | |
} | |
/** | |
* 服务端从来不会进入。。。 | |
* | |
* @param key | |
* @throws ClosedChannelException | |
*/ | |
public static void handleConnect(SelectionKey key) throws ClosedChannelException { | |
System.out.println("Handle connect event"); | |
System.out.println(key.interestOps()); | |
key.interestOps(SelectionKey.OP_READ); | |
System.out.println(key.interestOps()); | |
} | |
public static void handleRead(SelectionKey key) throws IOException { | |
System.out.println("Handle read event"); | |
SocketChannel sc = (SocketChannel) key.channel(); | |
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); | |
long bytesRead = sc.read(buffer); | |
System.out.println("read:" + bytesRead); | |
if (bytesRead == -1) { | |
System.out.println("Peer closed"); | |
sc.close(); | |
System.exit(0); | |
} | |
if (bytesRead > 0) { | |
buffer.flip(); | |
while (buffer.hasRemaining()) { | |
System.out.print((char) buffer.get()); | |
} | |
} | |
ByteBuffer writeBuffer = ByteBuffer.allocate(BUF_SIZE*BUF_SIZE); | |
for (int i = 0; i < BUF_SIZE*BUF_SIZE; i++) { | |
writeBuffer.put((byte)i); | |
} | |
writeBuffer.flip(); | |
key.attach(writeBuffer); | |
// won't have any effect this time, and will take effect after next select() invoked | |
key.interestOps(SelectionKey.OP_WRITE|SelectionKey.OP_READ); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment