Skip to content

Instantly share code, notes, and snippets.

@ochinchina
Last active September 20, 2023 09:52
Show Gist options
  • Save ochinchina/72cc23220dc8a933fc46 to your computer and use it in GitHub Desktop.
Save ochinchina/72cc23220dc8a933fc46 to your computer and use it in GitHub Desktop.
Async socket demo in java
package asyncsocket;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author Steven
*/
public class EchoClient {
public EchoClient( String host, int port, final String message, final AtomicInteger messageWritten, final AtomicInteger messageRead ) throws IOException {
//create a socket channel
AsynchronousSocketChannel sockChannel = AsynchronousSocketChannel.open();
//try to connect to the server side
sockChannel.connect( new InetSocketAddress(host, port), sockChannel, new CompletionHandler<Void, AsynchronousSocketChannel >() {
@Override
public void completed(Void result, AsynchronousSocketChannel channel ) {
//start to read message
startRead( channel,messageRead );
//write an message to server side
startWrite( channel, message, messageWritten );
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel channel) {
System.out.println( "fail to connect to server");
}
});
}
private void startRead( final AsynchronousSocketChannel sockChannel, final AtomicInteger messageRead ) {
final ByteBuffer buf = ByteBuffer.allocate(2048);
sockChannel.read( buf, sockChannel, new CompletionHandler<Integer, AsynchronousSocketChannel>(){
@Override
public void completed(Integer result, AsynchronousSocketChannel channel) {
//message is read from server
messageRead.getAndIncrement();
//print the message
System.out.println( "Read message:" + new String( buf.array()) );
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel channel) {
System.out.println( "fail to read message from server");
}
});
}
private void startWrite( final AsynchronousSocketChannel sockChannel, final String message, final AtomicInteger messageWritten ) {
ByteBuffer buf = ByteBuffer.allocate(2048);
buf.put(message.getBytes());
buf.flip();
messageWritten.getAndIncrement();
sockChannel.write(buf, sockChannel, new CompletionHandler<Integer, AsynchronousSocketChannel >() {
@Override
public void completed(Integer result, AsynchronousSocketChannel channel ) {
//after message written
//NOTHING TO DO
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel channel) {
System.out.println( "Fail to write the message to server");
}
});
}
public static void main( String...args ) {
try {
AtomicInteger messageWritten = new AtomicInteger( 0 );
AtomicInteger messageRead = new AtomicInteger( 0 );
for( int i = 0; i < 1000; i++ ) {
new EchoClient( "127.0.0.1", 3575, "echo test", messageWritten, messageRead );
}
while( messageRead.get() != 1000 ) {
Thread.sleep( 1000 );
System.out.println( "message write:" + messageWritten );
System.out.println( "message read:" + messageRead );
}
} catch (Exception ex) {
Logger.getLogger(EchoClient.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
package asyncsocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author Steven Ou
*/
public class EchoServer {
public EchoServer( String bindAddr, int bindPort ) throws IOException {
InetSocketAddress sockAddr = new InetSocketAddress(bindAddr, bindPort);
//create a socket channel and bind to local bind address
AsynchronousServerSocketChannel serverSock = AsynchronousServerSocketChannel.open().bind(sockAddr);
//start to accept the connection from client
serverSock.accept(serverSock, new CompletionHandler<AsynchronousSocketChannel,AsynchronousServerSocketChannel >() {
@Override
public void completed(AsynchronousSocketChannel sockChannel, AsynchronousServerSocketChannel serverSock ) {
//a connection is accepted, start to accept next connection
serverSock.accept( serverSock, this );
//start to read message from the client
startRead( sockChannel );
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel serverSock) {
System.out.println( "fail to accept a connection");
}
} );
}
private void startRead( AsynchronousSocketChannel sockChannel ) {
final ByteBuffer buf = ByteBuffer.allocate(2048);
//read message from client
sockChannel.read( buf, sockChannel, new CompletionHandler<Integer, AsynchronousSocketChannel >() {
/**
* some message is read from client, this callback will be called
*/
@Override
public void completed(Integer result, AsynchronousSocketChannel channel ) {
buf.flip();
// echo the message
startWrite( channel, buf );
//start to read next message again
startRead( channel );
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel channel ) {
System.out.println( "fail to read message from client");
}
});
}
private void startWrite( AsynchronousSocketChannel sockChannel, final ByteBuffer buf) {
sockChannel.write(buf, sockChannel, new CompletionHandler<Integer, AsynchronousSocketChannel >() {
@Override
public void completed(Integer result, AsynchronousSocketChannel channel) {
//finish to write message to client, nothing to do
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel channel) {
//fail to write message to client
System.out.println( "Fail to write message to client");
}
});
}
public static void main( String[] args ) {
try {
new EchoServer( "127.0.0.1", 3575 );
for( ; ; ) {
Thread.sleep(10*1000);
}
} catch (Exception ex) {
Logger.getLogger(EchoServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
@vishwalshah
Copy link

This program won't run

@Axrorxoja
Copy link

perfect, thank you for sharing )))

@Angus-Less
Copy link

With the EchoServer.java, when a user disconnects it causes a huge spike in CPU % and memory. Unsure as to why this occurs as it should just go back to the accept state right?

@irineu
Copy link

irineu commented Aug 27, 2022

With the EchoServer.java, when a user disconnects it causes a huge spike in CPU % and memory. Unsure as to why this occurs as it should just go back to the accept state right?

Then the client disconnect the: public void completed(Integer result, AsynchronousSocketChannel channel ) will be fired in a loop and increase CPU and memory.

If the Integer result be -1, it means of the client is disconnected, so you can force the close connection:

@Override
            public void completed(Integer result, AsynchronousSocketChannel channel  ) {
                buf.flip();

                if(result == -1){
                    try {
                        System.out.println("Client disconnected");
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return;
                }

                // echo the message
                startWrite( channel, buf );

                //start to read next message again
                startRead( channel );
            }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment