Skip to content

Instantly share code, notes, and snippets.

@Raghav2211
Created October 6, 2025 17:51
Show Gist options
  • Select an option

  • Save Raghav2211/fbecf71d71be97d76ed38f7dd0402aca to your computer and use it in GitHub Desktop.

Select an option

Save Raghav2211/fbecf71d71be97d76ed38f7dd0402aca to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// telnet localhost 9000
public class MultiEventLoopServer {
private static final int PORT = 9000;
private static final int WORKER_COUNT = 2;
public static void main(String[] args) throws IOException {
new MultiEventLoopServer().start();
}
public void start() throws IOException {
/*
1. JVM calls the OS to create a new TCP socket.
2. Returns a Java ServerSocketChannel object that wraps the OS socket.
3. At this point, the socket is not bound to any port yet — it just exists in memory.
*/
try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
/*
1. OS allocates port (PORT=9000) and associates it with this socket.
2. The kernel now knows: “anything arriving at port 9000 should go to this socket.”
3. From this moment, clients can connect.
*/
serverChannel.bind(new InetSocketAddress(PORT));
serverChannel.configureBlocking(false);
System.out.println("🚀 Server running on port " + PORT);
// Create a selector just for accepting new connections
// More or less a Event notification hub:
Selector acceptSelector = Selector.open();
serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
// Create worker event loops
WorkerEventLoop[] workers = new WorkerEventLoop[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) {
workers[i] = new WorkerEventLoop("worker-" + i);
new Thread(workers[i]).start();
}
int next = 0;
// Main acceptor loop
while (true) {
acceptSelector.select(); // wait until a new connection is ready
Iterator<SelectionKey> keys = acceptSelector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isAcceptable()) {
SocketChannel client = serverChannel.accept(); // accept new client
if (client == null) continue;
client.configureBlocking(false);
// Distribute the client to a worker event loop (round robin)
WorkerEventLoop worker = workers[next];
next = (next + 1) % WORKER_COUNT;
worker.register(client);
}
}
}
}
}
// 🧵 Worker event loop class
static class WorkerEventLoop implements Runnable {
private final Selector selector;
private final String name;
private final ExecutorService workerPool = Executors.newVirtualThreadPerTaskExecutor();
WorkerEventLoop(String name) throws IOException {
this.name = name;
this.selector = Selector.open();
}
// register a new client to this worker's selector
public void register(SocketChannel client) throws ClosedChannelException {
System.out.println("Registering client on " + Thread.currentThread());
// wake up selector if it's blocking in select()
selector.wakeup();
client.register(selector, SelectionKey.OP_READ);
System.out.println("✅ Registered " + client.socket().getRemoteSocketAddress() + " to " + name);
}
@Override
public void run() {
System.out.println("✅ Starting " + name + " on " + Thread.currentThread().getName());
try {
while (true) {
selector.select(); // wait for ready channels
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
key.interestOps(0);// temporarily disable OP_READ to avoid repeated triggers
// handle the work asynchronously in a virtual thread
workerPool.execute(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(256);
int read = client.read(buffer);
if (read == -1) {
System.out.println("❌ Connection closed " + client.getRemoteAddress());
client.close();
} else {
handleClient(client, buffer);
}
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
} finally {
if (client.isOpen()) {
// Re-enable OP_READ
key.interestOps(SelectionKey.OP_READ);
key.selector().wakeup(); // wake selector to re-register
}
}
});
}
}
}
} catch (IOException e) {
System.err.println("💥 Error in " + name + ": " + e.getMessage());
}
}
// This one you see basically on spring boot controller
private void handleClient(SocketChannel client, ByteBuffer buffer) {
try {
String msg = new String(buffer.array(), 0, buffer.limit()).trim();
System.out.println("--------------------------------------------------------------------------");
System.out.println("📩 [" + name + "] Received from " + client.getRemoteAddress());
System.out.println("[");
System.out.println(msg);
System.out.println("]");
System.out.println("--------------------------------------------------------------------------");
// Echo back
buffer.flip();
client.write(buffer);
buffer.clear();
} catch (IOException e) {
System.err.println("⚠️ Error handling client: " + e.getMessage());
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment