Last active
March 14, 2021 15:47
-
-
Save HaloFour/78a0b1d4caa4b75599443e9f61a3e1f5 to your computer and use it in GitHub Desktop.
Test Netty Reactor Connection Pooling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.Duration; | |
import java.util.Random; | |
import java.util.concurrent.atomic.AtomicReference; | |
import io.netty.handler.codec.http.HttpHeaderNames; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.Sinks; | |
import reactor.netty.http.client.HttpClient; | |
import reactor.netty.http.server.HttpServer; | |
import reactor.netty.resources.ConnectionPoolMetrics; | |
import reactor.netty.resources.ConnectionProvider; | |
import reactor.util.Logger; | |
import reactor.util.Loggers; | |
public class NettyTest { | |
private static final AtomicReference<ConnectionPoolMetrics> metricsRef = new AtomicReference<>(); | |
private static Logger log; | |
private synchronized static void reportMetrics(String action) { | |
var metrics = metricsRef.get(); | |
if (metrics != null) { | |
log.info("{} - Acquired: {} Allocated: {} Pending: {} Idle: {}", action, | |
metrics.acquiredSize(), | |
metrics.allocatedSize(), | |
metrics.pendingAcquireSize(), | |
metrics.idleSize()); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Loggers.useVerboseConsoleLoggers(); | |
log = Loggers.getLogger(NettyTest.class); | |
Sinks.One<String> sink = Sinks.one(); | |
//var mono = sink.asMono(); | |
var mono = Mono.just("Done!"); | |
var random = new Random(); | |
var httpServer = HttpServer.create() | |
.host("localhost") | |
.port(8088) | |
.idleTimeout(Duration.ofMinutes(1)) | |
.route(routes -> | |
routes.get("/hello", (request, response) -> | |
mono.flatMap(value -> | |
Mono.from(response | |
.keepAlive(true) | |
.status(200) | |
.addHeader(HttpHeaderNames.CONTENT_LENGTH, "5") | |
.sendString(Mono.delay(Duration.ofMillis(random.nextInt(500) + 500)) | |
.map(ignored -> value)) | |
) | |
) | |
) | |
); | |
httpServer.warmup().block(); | |
var serverDisposable = httpServer.bindNow(); | |
var connectionProvider = ConnectionProvider.builder("NettyTest") | |
.maxConnections(20) | |
.maxIdleTime(Duration.ofHours(1)) | |
.maxLifeTime(Duration.ofHours(1)) | |
.pendingAcquireMaxCount(40) | |
.pendingAcquireTimeout(Duration.ofMillis(100)) | |
.fifo() | |
.metrics(true, () -> (poolName, id, remoteAddress, metrics) -> metricsRef.set(metrics)) | |
.build(); | |
var httpClient = HttpClient.create(connectionProvider) | |
.host("localhost") | |
.port(8088) | |
.doOnConnected(conn -> reportMetrics("Connected")) | |
.doOnDisconnected(conn -> reportMetrics("Disconnected")) | |
.keepAlive(true); | |
httpClient.warmup().block(); | |
var list = Flux.interval(Duration.ofMillis(100)) | |
.take(50) | |
.flatMap(rid -> httpClient.get() | |
.uri("http://localhost:8088/hello") | |
.responseContent() | |
.aggregate() | |
.asString() | |
.doOnSubscribe(s -> log.info("Starting request {}", rid)) | |
.doOnSuccess(s -> log.info("Finished request {}: {}", rid, s)) | |
.doOnError(e -> log.info("Request {} failed! {}", rid, e.getClass().getSimpleName())) | |
.onErrorResume(e -> Mono.just(e.getClass().getSimpleName()))) | |
.publish() | |
.autoConnect(0) | |
.collectList(); | |
reportMetrics("Wait for it..."); | |
Thread.sleep(10_000); | |
sink.tryEmitValue("DONE!"); | |
for (var result : list.block()) { | |
log.info(result); | |
} | |
Thread.sleep(10_000); | |
reportMetrics("Finished"); | |
connectionProvider.dispose(); | |
serverDisposable.disposeNow(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment