Created
October 6, 2025 17:51
-
-
Save Raghav2211/fbecf71d71be97d76ed38f7dd0402aca to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.IOException; | |
| import java.net.InetSocketAddress; | |
| import java.nio.ByteBuffer; | |
| import java.nio.channels.*; | |
| import java.util.Iterator; | |
| import java.util.concurrent.ExecutorService; | |
| import java.util.concurrent.Executors; | |
| // telnet localhost 9000 | |
| public class MultiEventLoopServer { | |
| private static final int PORT = 9000; | |
| private static final int WORKER_COUNT = 2; | |
| public static void main(String[] args) throws IOException { | |
| new MultiEventLoopServer().start(); | |
| } | |
| public void start() throws IOException { | |
| /* | |
| 1. JVM calls the OS to create a new TCP socket. | |
| 2. Returns a Java ServerSocketChannel object that wraps the OS socket. | |
| 3. At this point, the socket is not bound to any port yet — it just exists in memory. | |
| */ | |
| try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) { | |
| /* | |
| 1. OS allocates port (PORT=9000) and associates it with this socket. | |
| 2. The kernel now knows: “anything arriving at port 9000 should go to this socket.” | |
| 3. From this moment, clients can connect. | |
| */ | |
| serverChannel.bind(new InetSocketAddress(PORT)); | |
| serverChannel.configureBlocking(false); | |
| System.out.println("🚀 Server running on port " + PORT); | |
| // Create a selector just for accepting new connections | |
| // More or less a Event notification hub: | |
| Selector acceptSelector = Selector.open(); | |
| serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT); | |
| // Create worker event loops | |
| WorkerEventLoop[] workers = new WorkerEventLoop[WORKER_COUNT]; | |
| for (int i = 0; i < WORKER_COUNT; i++) { | |
| workers[i] = new WorkerEventLoop("worker-" + i); | |
| new Thread(workers[i]).start(); | |
| } | |
| int next = 0; | |
| // Main acceptor loop | |
| while (true) { | |
| acceptSelector.select(); // wait until a new connection is ready | |
| Iterator<SelectionKey> keys = acceptSelector.selectedKeys().iterator(); | |
| while (keys.hasNext()) { | |
| SelectionKey key = keys.next(); | |
| keys.remove(); | |
| if (key.isAcceptable()) { | |
| SocketChannel client = serverChannel.accept(); // accept new client | |
| if (client == null) continue; | |
| client.configureBlocking(false); | |
| // Distribute the client to a worker event loop (round robin) | |
| WorkerEventLoop worker = workers[next]; | |
| next = (next + 1) % WORKER_COUNT; | |
| worker.register(client); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // 🧵 Worker event loop class | |
| static class WorkerEventLoop implements Runnable { | |
| private final Selector selector; | |
| private final String name; | |
| private final ExecutorService workerPool = Executors.newVirtualThreadPerTaskExecutor(); | |
| WorkerEventLoop(String name) throws IOException { | |
| this.name = name; | |
| this.selector = Selector.open(); | |
| } | |
| // register a new client to this worker's selector | |
| public void register(SocketChannel client) throws ClosedChannelException { | |
| System.out.println("Registering client on " + Thread.currentThread()); | |
| // wake up selector if it's blocking in select() | |
| selector.wakeup(); | |
| client.register(selector, SelectionKey.OP_READ); | |
| System.out.println("✅ Registered " + client.socket().getRemoteSocketAddress() + " to " + name); | |
| } | |
| @Override | |
| public void run() { | |
| System.out.println("✅ Starting " + name + " on " + Thread.currentThread().getName()); | |
| try { | |
| while (true) { | |
| selector.select(); // wait for ready channels | |
| Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); | |
| while (keys.hasNext()) { | |
| SelectionKey key = keys.next(); | |
| keys.remove(); | |
| if (key.isReadable()) { | |
| SocketChannel client = (SocketChannel) key.channel(); | |
| key.interestOps(0);// temporarily disable OP_READ to avoid repeated triggers | |
| // handle the work asynchronously in a virtual thread | |
| workerPool.execute(() -> { | |
| try { | |
| ByteBuffer buffer = ByteBuffer.allocate(256); | |
| int read = client.read(buffer); | |
| if (read == -1) { | |
| System.out.println("❌ Connection closed " + client.getRemoteAddress()); | |
| client.close(); | |
| } else { | |
| handleClient(client, buffer); | |
| } | |
| } catch (Exception e) { | |
| System.out.println("Error: " + e.getMessage()); | |
| } finally { | |
| if (client.isOpen()) { | |
| // Re-enable OP_READ | |
| key.interestOps(SelectionKey.OP_READ); | |
| key.selector().wakeup(); // wake selector to re-register | |
| } | |
| } | |
| }); | |
| } | |
| } | |
| } | |
| } catch (IOException e) { | |
| System.err.println("💥 Error in " + name + ": " + e.getMessage()); | |
| } | |
| } | |
| // This one you see basically on spring boot controller | |
| private void handleClient(SocketChannel client, ByteBuffer buffer) { | |
| try { | |
| String msg = new String(buffer.array(), 0, buffer.limit()).trim(); | |
| System.out.println("--------------------------------------------------------------------------"); | |
| System.out.println("📩 [" + name + "] Received from " + client.getRemoteAddress()); | |
| System.out.println("["); | |
| System.out.println(msg); | |
| System.out.println("]"); | |
| System.out.println("--------------------------------------------------------------------------"); | |
| // Echo back | |
| buffer.flip(); | |
| client.write(buffer); | |
| buffer.clear(); | |
| } catch (IOException e) { | |
| System.err.println("⚠️ Error handling client: " + e.getMessage()); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment