-
-
Save cenodis/57c5239a879821211ec0b9ca9aa4f863 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.cenodis; | |
import org.newsclub.net.unix.AFUNIXServerSocket; | |
import org.newsclub.net.unix.AFUNIXSocket; | |
import org.newsclub.net.unix.AFUNIXSocketAddress; | |
import java.io.IOException; | |
import java.net.StandardProtocolFamily; | |
import java.net.UnixDomainSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.util.ArrayList; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.function.Function; | |
/** | |
* Demonstrates difference in pinning behaviour between java.nio.channel.SocketChannel and | |
* org.newsclub.net.unix.AFUNIXSocket. | |
* <p/> | |
* Make sure to run this class with a single carrier thread for all virtual threads: | |
* -Djdk.virtualThreadScheduler.maxPoolSize=1 | |
*/ | |
public class SocketTest { | |
// change this to switch between sockets via java.nio.channels and org.newsclub.net.unix | |
private static final Backend BACKEND = Backend.JUNIXSOCKET; | |
private static final Path SOCKET_PATH = Path.of("/", "tmp", "test_socket"); | |
private static final UnixDomainSocketAddress SOCKET_CHANNEL_ADDR = UnixDomainSocketAddress.of(SOCKET_PATH); | |
private static final CountDownLatch SOCKET_READY = new CountDownLatch(1); | |
private enum Backend { | |
SOCKET_CHANNEL, | |
JUNIXSOCKET, | |
} | |
private static void serveSocketChannel() { | |
var sockets = new ArrayList<SocketChannel>(); | |
try (var channel = ServerSocketChannel.open(StandardProtocolFamily.UNIX)) { | |
channel.bind(SOCKET_CHANNEL_ADDR); | |
System.out.println("Server socket channel created"); | |
SOCKET_READY.countDown(); | |
while (true) { | |
var socket = channel.accept(); | |
sockets.add(socket); | |
} | |
} catch (IOException e) { | |
if (!Thread.interrupted()) { | |
throw new RuntimeException(e); | |
} | |
System.out.println("Server closed by interrupt"); | |
} finally { | |
for (var socket : sockets) { | |
try { | |
socket.close(); | |
} catch (IOException e) {} | |
} | |
} | |
} | |
private static void readSocketChannel() { | |
try (var channel = SocketChannel.open(SOCKET_CHANNEL_ADDR)) { | |
var buffer = ByteBuffer.allocate(10); | |
System.out.println("Client channel reading from socket"); | |
channel.read(buffer); | |
} catch (IOException e) { | |
if (!Thread.interrupted()) { | |
throw new RuntimeException(e); | |
} | |
System.out.println("Client closed by interrupt"); | |
} | |
} | |
private static void serveUnixSocket() { | |
var sockets = new ArrayList<AFUNIXSocket>(); | |
try (var socket = AFUNIXServerSocket.newInstance()) { | |
socket.bind(AFUNIXSocketAddress.of(SOCKET_PATH)); | |
System.out.println("Server unix socket created"); | |
SOCKET_READY.countDown(); | |
while (true) { | |
var client = socket.accept(); | |
sockets.add(client); | |
} | |
} catch (IOException e) { | |
if (!Thread.interrupted()) { | |
throw new RuntimeException(e); | |
} | |
System.out.println("Server closed by interrupt"); | |
} finally { | |
for (var socket : sockets) { | |
try { | |
socket.close(); | |
} catch (IOException e) {} | |
} | |
} | |
} | |
private static void readUnixSocket() { | |
try (var socket = AFUNIXSocket.newInstance()) { | |
socket.connect(AFUNIXSocketAddress.of(SOCKET_PATH)); | |
var buffer = new byte[10]; | |
System.out.println("Client unix socket reading from socket"); | |
try (var input = socket.getInputStream()) { | |
input.read(buffer); | |
} | |
} catch (IOException e) { | |
if (!Thread.interrupted()) { | |
throw new RuntimeException(e); | |
} | |
System.out.println("Client closed by interrupt"); | |
} | |
} | |
public static void main(String[] args) { | |
try { | |
Runnable serverRunnable = switch (BACKEND) { | |
case JUNIXSOCKET -> SocketTest::serveUnixSocket; | |
case SOCKET_CHANNEL -> SocketTest::serveSocketChannel; | |
}; | |
Runnable clientRunnable = switch (BACKEND) { | |
case JUNIXSOCKET -> SocketTest::readUnixSocket; | |
case SOCKET_CHANNEL -> SocketTest::readSocketChannel; | |
}; | |
var server = Thread.ofVirtual() | |
.start(serverRunnable); | |
SOCKET_READY.await(); | |
var client = Thread.ofVirtual() | |
.start(clientRunnable); | |
Thread.sleep(3000); | |
client.interrupt(); | |
client.join(500); | |
server.interrupt(); | |
server.join(500); | |
Function<Thread, String> alive = (b) -> b.isAlive() ? "alive" : "dead"; | |
System.out.println("After interrupt server is " + alive.apply(server) + " and client is " + alive.apply(client)); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} finally { | |
try { | |
Files.deleteIfExists(SOCKET_PATH); | |
} catch (IOException e) {} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment