Skip to content

Instantly share code, notes, and snippets.

@cenodis
Created April 15, 2024 21:48
Show Gist options
  • Save cenodis/57c5239a879821211ec0b9ca9aa4f863 to your computer and use it in GitHub Desktop.
Save cenodis/57c5239a879821211ec0b9ca9aa4f863 to your computer and use it in GitHub Desktop.
package me.cenodis;
import org.newsclub.net.unix.AFUNIXServerSocket;
import org.newsclub.net.unix.AFUNIXSocket;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import java.io.IOException;
import java.net.StandardProtocolFamily;
import java.net.UnixDomainSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
/**
* Demonstrates difference in pinning behaviour between java.nio.channel.SocketChannel and
* org.newsclub.net.unix.AFUNIXSocket.
* <p/>
* Make sure to run this class with a single carrier thread for all virtual threads:
* -Djdk.virtualThreadScheduler.maxPoolSize=1
*/
public class SocketTest {
// change this to switch between sockets via java.nio.channels and org.newsclub.net.unix
private static final Backend BACKEND = Backend.JUNIXSOCKET;
private static final Path SOCKET_PATH = Path.of("/", "tmp", "test_socket");
private static final UnixDomainSocketAddress SOCKET_CHANNEL_ADDR = UnixDomainSocketAddress.of(SOCKET_PATH);
private static final CountDownLatch SOCKET_READY = new CountDownLatch(1);
private enum Backend {
SOCKET_CHANNEL,
JUNIXSOCKET,
}
private static void serveSocketChannel() {
var sockets = new ArrayList<SocketChannel>();
try (var channel = ServerSocketChannel.open(StandardProtocolFamily.UNIX)) {
channel.bind(SOCKET_CHANNEL_ADDR);
System.out.println("Server socket channel created");
SOCKET_READY.countDown();
while (true) {
var socket = channel.accept();
sockets.add(socket);
}
} catch (IOException e) {
if (!Thread.interrupted()) {
throw new RuntimeException(e);
}
System.out.println("Server closed by interrupt");
} finally {
for (var socket : sockets) {
try {
socket.close();
} catch (IOException e) {}
}
}
}
private static void readSocketChannel() {
try (var channel = SocketChannel.open(SOCKET_CHANNEL_ADDR)) {
var buffer = ByteBuffer.allocate(10);
System.out.println("Client channel reading from socket");
channel.read(buffer);
} catch (IOException e) {
if (!Thread.interrupted()) {
throw new RuntimeException(e);
}
System.out.println("Client closed by interrupt");
}
}
private static void serveUnixSocket() {
var sockets = new ArrayList<AFUNIXSocket>();
try (var socket = AFUNIXServerSocket.newInstance()) {
socket.bind(AFUNIXSocketAddress.of(SOCKET_PATH));
System.out.println("Server unix socket created");
SOCKET_READY.countDown();
while (true) {
var client = socket.accept();
sockets.add(client);
}
} catch (IOException e) {
if (!Thread.interrupted()) {
throw new RuntimeException(e);
}
System.out.println("Server closed by interrupt");
} finally {
for (var socket : sockets) {
try {
socket.close();
} catch (IOException e) {}
}
}
}
private static void readUnixSocket() {
try (var socket = AFUNIXSocket.newInstance()) {
socket.connect(AFUNIXSocketAddress.of(SOCKET_PATH));
var buffer = new byte[10];
System.out.println("Client unix socket reading from socket");
try (var input = socket.getInputStream()) {
input.read(buffer);
}
} catch (IOException e) {
if (!Thread.interrupted()) {
throw new RuntimeException(e);
}
System.out.println("Client closed by interrupt");
}
}
public static void main(String[] args) {
try {
Runnable serverRunnable = switch (BACKEND) {
case JUNIXSOCKET -> SocketTest::serveUnixSocket;
case SOCKET_CHANNEL -> SocketTest::serveSocketChannel;
};
Runnable clientRunnable = switch (BACKEND) {
case JUNIXSOCKET -> SocketTest::readUnixSocket;
case SOCKET_CHANNEL -> SocketTest::readSocketChannel;
};
var server = Thread.ofVirtual()
.start(serverRunnable);
SOCKET_READY.await();
var client = Thread.ofVirtual()
.start(clientRunnable);
Thread.sleep(3000);
client.interrupt();
client.join(500);
server.interrupt();
server.join(500);
Function<Thread, String> alive = (b) -> b.isAlive() ? "alive" : "dead";
System.out.println("After interrupt server is " + alive.apply(server) + " and client is " + alive.apply(client));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
Files.deleteIfExists(SOCKET_PATH);
} catch (IOException e) {}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment