Last active
September 14, 2020 16:30
-
-
Save ochinchina/00b507d815e3ff7bc428 to your computer and use it in GitHub Desktop.
AsynchronousSocketChannel wrapper to support completely asyn operations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer; | |
import java.nio.channels.AsynchronousSocketChannel; | |
import java.nio.channels.CompletionHandler; | |
import java.util.LinkedList; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* This class wraps the AsynchronousSocketChannel reading and writing operations. The wrapped reading and writing operations is | |
* completely asynchronous operation. No any exception will be thrown from the read/write operation. If any exception occurs | |
* during read & write, the failed() method in CompletionHandler will be called. | |
* | |
* @author Steven Ou | |
* | |
*/ | |
public class AsynchronousSocketChannelWrapper { | |
private AsynchronousSocketChannel channel_; | |
private LinkedList< Operation > readOperations_ = new LinkedList< Operation >(); | |
private LinkedList< Operation > writeOperations_ = new LinkedList< Operation >(); | |
private interface Operation { | |
void execute(); | |
} | |
private class ProxyCompletionHandler<V,A> implements CompletionHandler<V,A> { | |
private CompletionHandler<V,? super A> handler_; | |
private LinkedList< Operation > ops_; | |
ProxyCompletionHandler( CompletionHandler<V,? super A> handler, LinkedList< Operation > ops ) { | |
this.handler_ = handler; | |
this.ops_ = ops; | |
} | |
@Override | |
public void completed(V result, A attachment) { | |
try { | |
handler_.completed(result, attachment); | |
}catch( Throwable ex ) { | |
} | |
executeNextOperation( ops_ ); | |
} | |
@Override | |
public void failed(Throwable exc, A attachment) { | |
try { | |
handler_.failed(exc, attachment); | |
}catch( Throwable ex ) { | |
} | |
executeNextOperation( ops_ ); | |
} | |
} | |
private class FirstReadOperation<A> implements Operation { | |
private ByteBuffer[] dsts; | |
private int offset; | |
private int length; | |
private long timeout; | |
private TimeUnit unit; | |
private A attachment; | |
private CompletionHandler<Long,? super A> handler; | |
FirstReadOperation( ByteBuffer[] dsts, | |
int offset, | |
int length, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Long,? super A> handler ) { | |
this.dsts = dsts; | |
this.offset = offset; | |
this.length = length; | |
this.timeout = timeout; | |
this.unit = unit; | |
this.attachment = attachment; | |
this.handler = new ProxyCompletionHandler<Long, A>( handler, readOperations_ ); | |
} | |
@Override | |
public void execute() { | |
try { | |
channel_.read( dsts, offset, length, timeout, unit, attachment, handler ); | |
}catch( Throwable ex ) { | |
this.handler.failed( ex, attachment ); | |
} | |
} | |
} | |
private class SecondReadOperation<A> implements Operation { | |
private ByteBuffer dst; | |
private A attachment; | |
private CompletionHandler<Integer,? super A> handler; | |
SecondReadOperation(ByteBuffer dst, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler) { | |
this.dst = dst; | |
this.attachment = attachment; | |
this.handler = new ProxyCompletionHandler<Integer, A>( handler, readOperations_ ); | |
} | |
@Override | |
public void execute() { | |
try { | |
channel_.read( dst, attachment, handler ); | |
}catch( Throwable ex ) { | |
handler.failed(ex, attachment); | |
} | |
} | |
} | |
private class ThirdReadOperation<A> implements Operation { | |
private ByteBuffer dst; | |
private long timeout; | |
private TimeUnit unit; | |
private A attachment; | |
private CompletionHandler<Integer,? super A> handler; | |
ThirdReadOperation( ByteBuffer dst, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler ) { | |
this.dst = dst; | |
this.timeout = timeout; | |
this.unit = unit; | |
this.attachment = attachment; | |
this.handler = new ProxyCompletionHandler<Integer, A>( handler, readOperations_ ); | |
} | |
@Override | |
public void execute() { | |
try { | |
channel_.read( dst, timeout, unit, attachment, handler ); | |
}catch( Throwable ex ) { | |
handler.failed(ex, attachment); | |
} | |
} | |
} | |
private class FirstWriteOperation<A> implements Operation { | |
private ByteBuffer[] srcs; | |
private int offset; | |
private int length; | |
private long timeout; | |
private TimeUnit unit; | |
private A attachment; | |
private CompletionHandler<Long,? super A> handler; | |
public FirstWriteOperation( ByteBuffer[] srcs, | |
int offset, | |
int length, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Long,? super A> handler ) { | |
this.srcs = srcs; | |
this.offset = offset; | |
this.length = length; | |
this.timeout = timeout; | |
this.unit = unit; | |
this.attachment = attachment; | |
this.handler = new ProxyCompletionHandler<Long, A>( handler, writeOperations_ ); | |
} | |
@Override | |
public void execute() { | |
try { | |
channel_.write( srcs, offset, length, timeout, unit, attachment, handler ); | |
}catch( Throwable ex ) { | |
handler.failed(ex, attachment); | |
} | |
} | |
} | |
private class SecondWriteOperation<A> implements Operation { | |
private ByteBuffer src; | |
private A attachment; | |
private CompletionHandler<Integer,? super A> handler; | |
SecondWriteOperation( ByteBuffer src, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler ) { | |
this.src = src; | |
this.attachment = attachment; | |
this.handler = new ProxyCompletionHandler<Integer, A>( handler, writeOperations_ ); | |
} | |
@Override | |
public void execute() { | |
try { | |
channel_.write( src, attachment, handler ); | |
}catch( Throwable ex ) { | |
handler.failed(ex, attachment); | |
} | |
} | |
} | |
private class ThirdWriteOperation<A> implements Operation { | |
private ByteBuffer src; | |
private long timeout; | |
private TimeUnit unit; | |
private A attachment; | |
private CompletionHandler<Integer,? super A> handler; | |
ThirdWriteOperation( ByteBuffer src, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler ) { | |
this.src = src; | |
this.timeout = timeout; | |
this.unit = unit; | |
this.attachment = attachment; | |
this.handler = new ProxyCompletionHandler<Integer, A>( handler, writeOperations_ ); | |
} | |
@Override | |
public void execute() { | |
try { | |
channel_.write( src, timeout, unit, attachment, handler ); | |
}catch( Throwable ex ) { | |
handler.failed(ex, attachment); | |
} | |
} | |
} | |
public AsynchronousSocketChannelWrapper( AsynchronousSocketChannel channel ) { | |
this.channel_ = channel; | |
} | |
public AsynchronousSocketChannel getChannel() { | |
return this.channel_; | |
} | |
/** | |
* read data from underline socket channel. No any exception will be thrown by this method. If any exception occurs, the | |
* handler.failed() method will be called. | |
* | |
* @param dsts see AsynchronousSocketChannel.read() operation | |
* @param offset see AsynchronousSocketChannel.read() operation | |
* @param length see AsynchronousSocketChannel.read() operation | |
* @param timeout see AsynchronousSocketChannel.read() operation | |
* @param unit see AsynchronousSocketChannel.read() operation | |
* @param attachment see AsynchronousSocketChannel.read() operation | |
* @param handler see AsynchronousSocketChannel.read() operation | |
* | |
* @throws no exception thrown. If any exeption occurs during reading operation, the handler.failed() method will be called. | |
*/ | |
public <A> void read(ByteBuffer[] dsts, | |
int offset, | |
int length, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Long,? super A> handler) { | |
addReadOperation( new FirstReadOperation<A>( dsts, offset, length, timeout, unit, attachment, handler ) ); | |
} | |
/** | |
* | |
* @param dst see AsynchronousSocketChannel.read() operation | |
* @param attachment see AsynchronousSocketChannel.read() operation | |
* @param handler see AsynchronousSocketChannel.read() operation | |
* | |
* @see #read(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler) | |
*/ | |
public <A> void read(ByteBuffer dst, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler) { | |
addReadOperation( new SecondReadOperation<A>( dst, attachment, handler ) ); | |
} | |
/** | |
* | |
* @param dst see AsynchronousSocketChannel.read() operation | |
* @param timeout see AsynchronousSocketChannel.read() operation | |
* @param unit see AsynchronousSocketChannel.read() operation | |
* @param attachment see AsynchronousSocketChannel.read() operation | |
* @param handler see AsynchronousSocketChannel.read() operation | |
* | |
* @see #read(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler) | |
*/ | |
public <A> void read(ByteBuffer dst, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler) { | |
addReadOperation( new ThirdReadOperation<A>( dst, timeout, unit, attachment, handler ) ); | |
} | |
/** | |
* write data to peer, no any exception will be thrown by this write operation. If any exception got during writing operation, the | |
* handler.failed() will be called. | |
* | |
* @param srcs see AsynchronousSocketChannel.write() operation | |
* @param offset see AsynchronousSocketChannel.write() operation | |
* @param length see AsynchronousSocketChannel.write() operation | |
* @param timeout see AsynchronousSocketChannel.write() operation | |
* @param unit see AsynchronousSocketChannel.write() operation | |
* @param attachment see AsynchronousSocketChannel.write() operation | |
* @param handler see AsynchronousSocketChannel.write() operation | |
* | |
* @throws no exception will be thrown by this write operation. handler.failed() will be called if get any exception during writing. | |
*/ | |
public <A> void write(ByteBuffer[] srcs, | |
int offset, | |
int length, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Long,? super A> handler) { | |
addWriteOperation( new FirstWriteOperation<A>( srcs, offset, length, timeout, unit, attachment, handler )); | |
} | |
/** | |
* | |
* @param src see AsynchronousSocketChannel.write() operation | |
* @param attachment see AsynchronousSocketChannel.write() operation | |
* @param handler see AsynchronousSocketChannel.write() operation | |
* | |
* @see #write(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler) | |
*/ | |
public <A> void write(ByteBuffer src, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler) { | |
addWriteOperation( new SecondWriteOperation<A>( src, attachment, handler )); | |
} | |
/** | |
* | |
* @param src see AsynchronousSocketChannel.write() operation | |
* @param timeout see AsynchronousSocketChannel.write() operation | |
* @param unit see AsynchronousSocketChannel.write() operation | |
* @param attachment see AsynchronousSocketChannel.write() operation | |
* @param handler see AsynchronousSocketChannel.write() operation | |
* | |
* @see #write(ByteBuffer[], int, int, long, TimeUnit, Object, CompletionHandler) | |
*/ | |
public <A> void write(ByteBuffer src, | |
long timeout, | |
TimeUnit unit, | |
A attachment, | |
CompletionHandler<Integer,? super A> handler) { | |
addWriteOperation( new ThirdWriteOperation<A>( src, timeout, unit, attachment, handler ) ); | |
} | |
private void addReadOperation( Operation operation ) { | |
addOperation( operation, readOperations_ ); | |
} | |
private void addWriteOperation( Operation operation ) { | |
addOperation( operation, writeOperations_ ); | |
} | |
private static void addOperation( Operation operation, LinkedList< Operation > ops ) { | |
Operation executeOperation = null; | |
synchronized( ops ) { | |
ops.add( operation ); | |
if( ops.size() == 1 ) { | |
executeOperation = ops.getFirst(); | |
} | |
} | |
if( executeOperation != null ) { | |
executeOperation.execute(); | |
} | |
} | |
private static void executeNextOperation( LinkedList< Operation > ops ) { | |
Operation executeOperation = null; | |
synchronized( ops ) { | |
ops.removeFirst(); | |
if( !ops.isEmpty() ) { | |
executeOperation = ops.getFirst(); | |
} | |
} | |
if( executeOperation != null ) { | |
executeOperation.execute(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.UnsupportedEncodingException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.AsynchronousSocketChannel; | |
import java.nio.channels.CompletionHandler; | |
public class EchoClient { | |
private AsynchronousSocketChannelWrapper sockChannel_; | |
public EchoClient( String server, int port ) throws IOException { | |
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); | |
channel.connect( new InetSocketAddress( server, port ), channel, new CompletionHandler<Void, AsynchronousSocketChannel >(){ | |
@Override | |
public void completed(Void result, AsynchronousSocketChannel channel ) { | |
sockChannel_ = new AsynchronousSocketChannelWrapper( channel ); | |
startRead(); | |
sayHello(); | |
} | |
@Override | |
public void failed(Throwable ex, AsynchronousSocketChannel channel) { | |
ex.printStackTrace(); | |
} | |
}); | |
} | |
private void startRead() { | |
ByteBuffer buf = ByteBuffer.allocate(1024); | |
sockChannel_.read( buf, buf, new CompletionHandler<Integer, ByteBuffer >() { | |
@Override | |
public void completed(Integer result, ByteBuffer buf) { | |
System.out.println( "received " + result + " bytes"); | |
buf.flip(); | |
byte[] b = new byte[result]; | |
buf.get( b ); | |
try { | |
System.out.println( new String( b, "UTF-8")); | |
} catch (UnsupportedEncodingException e1) { | |
e1.printStackTrace(); | |
} | |
try { | |
sockChannel_.getChannel().close(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
System.exit( 0 ); | |
} | |
@Override | |
public void failed(Throwable exc, ByteBuffer attachment) { | |
System.exit( -1 ); | |
} | |
}); | |
} | |
private void sayHello() { | |
try { | |
sockChannel_.write( ByteBuffer.wrap( "hello".getBytes("UTF-8") ), null, new CompletionHandler<Integer, Void>() { | |
@Override | |
public void completed(Integer result, Void attachment) { | |
System.out.println( "write " + result + " bytes"); | |
} | |
@Override | |
public void failed(Throwable exc, Void attachment) { | |
} | |
}); | |
} catch (UnsupportedEncodingException e) { | |
e.printStackTrace(); | |
} | |
} | |
public static void main( String...args ) { | |
try { | |
EchoClient client = new EchoClient( "127.0.0.1", 3355 ); | |
for( ; ; ) { | |
Thread.sleep( 1000 ); | |
} | |
}catch( Exception ex ) { | |
ex.printStackTrace(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.AsynchronousServerSocketChannel; | |
import java.nio.channels.AsynchronousSocketChannel; | |
import java.nio.channels.CompletionHandler; | |
public class EchoServer { | |
private AsynchronousServerSocketChannel serverChannel_; | |
public EchoServer( int port ) throws IOException { | |
serverChannel_ = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress( port )); | |
startAccept(); | |
} | |
private void startAccept() { | |
serverChannel_.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() { | |
@Override | |
public void completed(AsynchronousSocketChannel sockChannel, Void arg) { | |
new EchoService( sockChannel ); | |
startAccept(); | |
} | |
@Override | |
public void failed(Throwable ex, Void arg) { | |
} | |
}); | |
} | |
private class EchoService { | |
private AsynchronousSocketChannelWrapper sockChannel_; | |
public EchoService( AsynchronousSocketChannel sockChannel ) { | |
this.sockChannel_ = new AsynchronousSocketChannelWrapper( sockChannel ); | |
startRead(); | |
} | |
private void startRead() { | |
ByteBuffer buf = ByteBuffer.allocate( 1024 ); | |
this.sockChannel_.read( buf, buf, createReadCompletionHandler(buf) ); | |
} | |
private CompletionHandler<Integer, ByteBuffer> createReadCompletionHandler( ByteBuffer buf ) { | |
return new CompletionHandler<Integer, ByteBuffer>() { | |
@Override | |
public void completed(Integer result, ByteBuffer buf ) { | |
System.out.println( "received " + result + " bytes"); | |
buf.flip(); | |
sockChannel_.write( buf, buf, createWriteCompletionHandler( buf ) ); | |
} | |
@Override | |
public void failed(Throwable ex, ByteBuffer buf) { | |
ex.printStackTrace(); | |
} | |
}; | |
} | |
private CompletionHandler<Integer, ByteBuffer> createWriteCompletionHandler( ByteBuffer buf ) { | |
return new CompletionHandler<Integer, ByteBuffer>() { | |
@Override | |
public void completed(Integer result, ByteBuffer attachment) { | |
System.out.println( "write " + result + " bytes"); | |
} | |
@Override | |
public void failed(Throwable exc, ByteBuffer attachment) { | |
} | |
}; | |
} | |
} | |
public static void main( String...args ) { | |
try { | |
new EchoServer( 3355 ); | |
for( ; ; ) { | |
Thread.sleep( 1000 ); | |
} | |
}catch( Exception ex ) { | |
ex.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment