|
import com.sun.net.httpserver.HttpServer; |
|
import java.io.IOException; |
|
import java.net.InetSocketAddress; |
|
import java.net.URI; |
|
import java.net.URISyntaxException; |
|
import java.nio.charset.StandardCharsets; |
|
import java.time.Duration; |
|
import java.util.Map; |
|
import java.util.concurrent.CompletableFuture; |
|
import java.util.concurrent.ExecutionException; |
|
import java.util.concurrent.Executors; |
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
import java.util.concurrent.atomic.AtomicInteger; |
|
import java.util.concurrent.atomic.AtomicLong; |
|
import java.util.stream.IntStream; |
|
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; |
|
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; |
|
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; |
|
import software.amazon.awssdk.services.dynamodb.model.AttributeValue; |
|
|
|
public class AwsSdkV2AndVirtualThreads { |
|
private static final int PORT = 8080; |
|
private static final Duration REQUEST_LATENCY = Duration.ofSeconds(5); |
|
private static final int TOTAL_REQUESTS = 1_000; |
|
|
|
public static void main(final String[] args) throws ExecutionException, InterruptedException, IOException, URISyntaxException { |
|
final var atomicNanoTime = new AtomicLong(); |
|
|
|
// 1. Create a local HTTP server for demo purposes |
|
final var server = createServer(atomicNanoTime); |
|
server.start(); |
|
System.out.printf("Server started on http://localhost:%s%n", PORT); |
|
|
|
// 2. Create an AWS SDK client and warm it up |
|
final var client = createClient(); |
|
System.out.println("Warming up client..."); |
|
atomicNanoTime.set(System.nanoTime()); |
|
sendRequest(client).get(); |
|
System.out.printf("Finished in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get())); |
|
|
|
// 3. Send the requests |
|
sendRequests(client, atomicNanoTime); |
|
|
|
server.stop(1); |
|
} |
|
|
|
private static HttpServer createServer(final AtomicLong atomicNanoTime) throws IOException { |
|
final var firstRequest = new AtomicBoolean(true); |
|
final var requestCount = new AtomicInteger(0); |
|
final var responseCount = new AtomicInteger(0); |
|
final var server = HttpServer.create(new InetSocketAddress(PORT), 1_000_000); |
|
server.createContext("/", exchange -> { |
|
if (requestCount.incrementAndGet() == TOTAL_REQUESTS + 1) { |
|
System.out.printf("Received all requests in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get())); |
|
} |
|
|
|
if (firstRequest.get()) { |
|
firstRequest.set(false); |
|
} else { |
|
try { |
|
Thread.sleep(REQUEST_LATENCY); |
|
} catch (InterruptedException e) { |
|
throw new RuntimeException(e); |
|
} |
|
} |
|
|
|
final var response = "{\"Item\":{\"...\":{\"S\":\"...\"}}}".getBytes(StandardCharsets.UTF_8); |
|
exchange.getResponseHeaders().add("content-type", "application/json"); |
|
exchange.sendResponseHeaders(200, response.length); |
|
exchange.getResponseBody().write(response); |
|
exchange.close(); |
|
|
|
if (responseCount.incrementAndGet() == TOTAL_REQUESTS + 1) { |
|
System.out.printf("Returned all responses in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get())); |
|
} |
|
}); |
|
server.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); |
|
|
|
return server; |
|
} |
|
|
|
private static DynamoDbAsyncClient createClient() throws URISyntaxException { |
|
return DynamoDbAsyncClient |
|
.builder() |
|
.endpointOverride(new URI("http://localhost:%s".formatted(PORT))) |
|
.httpClientBuilder( |
|
// See: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-netty.html |
|
NettyNioAsyncHttpClient |
|
.builder() |
|
.maxConcurrency(TOTAL_REQUESTS) |
|
) |
|
.asyncConfiguration(b -> b |
|
.advancedOption( |
|
// See: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/asynchronous.html |
|
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, |
|
Executors.newVirtualThreadPerTaskExecutor() |
|
) |
|
) |
|
.build(); |
|
} |
|
|
|
private static void sendRequests(final DynamoDbAsyncClient client, final AtomicLong atomicNanoTime) { |
|
final var executor = Executors.newVirtualThreadPerTaskExecutor(); |
|
|
|
try (client; executor) { |
|
System.out.println("Sending requests..."); |
|
atomicNanoTime.set(System.nanoTime()); |
|
final var futures = IntStream |
|
.range(0, TOTAL_REQUESTS) |
|
.mapToObj((i) -> CompletableFuture.runAsync( |
|
() -> { |
|
try { |
|
final var future = sendRequest(client); |
|
|
|
/* |
|
* This will block the current virtual thread, releasing the carrier platform thread to do other work. |
|
* We use `.get()` instead of `.join()` so the Virtual Thread can react to interrupts. |
|
*/ |
|
final var response = future.get(); |
|
|
|
// TODO: do something with response |
|
} catch (ExecutionException | InterruptedException e) { |
|
e.printStackTrace(System.out); |
|
} |
|
}, |
|
executor |
|
)) |
|
.toArray(CompletableFuture[]::new); |
|
|
|
CompletableFuture.allOf(futures).join(); |
|
System.out.printf("Finished in: %s%n", Duration.ofNanos(System.nanoTime() - atomicNanoTime.get())); |
|
} |
|
} |
|
|
|
private static CompletableFuture<?> sendRequest(final DynamoDbAsyncClient client) { |
|
/* |
|
* This will complete asynchronously via Netty/CRT's internal threads (if any). |
|
* In the case of Netty HTTP client, AWS SDK will use a virtual thread to process the |
|
* continuation of the future returned by Netty. |
|
*/ |
|
return client.getItem(r -> r.tableName("...").key(Map.of("...", AttributeValue.fromS("...")))); |
|
} |
|
} |