Skip to content

Instantly share code, notes, and snippets.

@rakeshopensource
Created September 14, 2023 15:57
Show Gist options
  • Save rakeshopensource/6a9935e4e4c3f6343670bdf6bcbf6084 to your computer and use it in GitHub Desktop.
Save rakeshopensource/6a9935e4e4c3f6343670bdf6bcbf6084 to your computer and use it in GitHub Desktop.
Sample basic single threaded event loop in java implemented using non-blocking IO
package org.rakeshopensource.systemdesign;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.*;
public class SingleThreadedHttpServer {
private static BlockingQueue<SocketChannel> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(8000));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
Thread ioThread = new Thread(() -> {
while (true) {
try {
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
client.configureBlocking(false);
queue.add(client);
key.cancel();
}
keys.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
Thread workerThread = new Thread(() -> {
while (true) {
try {
SocketChannel client = queue.take();
processRequest(client);
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
});
ioThread.start();
workerThread.start();
}
private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
private static void processRequest(SocketChannel client) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = client.read(buffer);
if (read == -1) {
client.close();
return;
}
String request = new String(buffer.array(), 0, read).trim();
String[] requestLines = request.split("\n");
String[] requestLineParts = requestLines[0].split(" ");
String httpMethod = requestLineParts[0];
String url = requestLineParts[1];
String responseBody;
if ("GET".equals(httpMethod)) {
if ("/data".equals(url)) {
responseBody = "Response from /data endpoint";
} else if ("/api".equals(url)) {
responseBody = "Response from /api endpoint";
} else {
responseBody = "Unknown endpoint";
}
} else if ("POST".equals(httpMethod)) {
responseBody = "POST request received";
} else {
responseBody = "Unsupported HTTP method";
}
answerWithHttp(client, responseBody);
}
private static void answerWithHttp(SocketChannel client, String responseBody) throws IOException {
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: " + responseBody.getBytes(StandardCharsets.UTF_8).length + "\r\n" +
"\r\n" +
responseBody;
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
client.write(responseBuffer);
client.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment