Skip to content

Instantly share code, notes, and snippets.

@HaloFour
Last active March 14, 2021 15:47
Show Gist options
  • Save HaloFour/78a0b1d4caa4b75599443e9f61a3e1f5 to your computer and use it in GitHub Desktop.
Save HaloFour/78a0b1d4caa4b75599443e9f61a3e1f5 to your computer and use it in GitHub Desktop.
Test Netty Reactor Connection Pooling
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.handler.codec.http.HttpHeaderNames;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
import reactor.util.Logger;
import reactor.util.Loggers;
public class NettyTest {
private static final AtomicReference<ConnectionPoolMetrics> metricsRef = new AtomicReference<>();
private static Logger log;
private synchronized static void reportMetrics(String action) {
var metrics = metricsRef.get();
if (metrics != null) {
log.info("{} - Acquired: {} Allocated: {} Pending: {} Idle: {}", action,
metrics.acquiredSize(),
metrics.allocatedSize(),
metrics.pendingAcquireSize(),
metrics.idleSize());
}
}
public static void main(String[] args) throws Exception {
Loggers.useVerboseConsoleLoggers();
log = Loggers.getLogger(NettyTest.class);
Sinks.One<String> sink = Sinks.one();
//var mono = sink.asMono();
var mono = Mono.just("Done!");
var random = new Random();
var httpServer = HttpServer.create()
.host("localhost")
.port(8088)
.idleTimeout(Duration.ofMinutes(1))
.route(routes ->
routes.get("/hello", (request, response) ->
mono.flatMap(value ->
Mono.from(response
.keepAlive(true)
.status(200)
.addHeader(HttpHeaderNames.CONTENT_LENGTH, "5")
.sendString(Mono.delay(Duration.ofMillis(random.nextInt(500) + 500))
.map(ignored -> value))
)
)
)
);
httpServer.warmup().block();
var serverDisposable = httpServer.bindNow();
var connectionProvider = ConnectionProvider.builder("NettyTest")
.maxConnections(20)
.maxIdleTime(Duration.ofHours(1))
.maxLifeTime(Duration.ofHours(1))
.pendingAcquireMaxCount(40)
.pendingAcquireTimeout(Duration.ofMillis(100))
.fifo()
.metrics(true, () -> (poolName, id, remoteAddress, metrics) -> metricsRef.set(metrics))
.build();
var httpClient = HttpClient.create(connectionProvider)
.host("localhost")
.port(8088)
.doOnConnected(conn -> reportMetrics("Connected"))
.doOnDisconnected(conn -> reportMetrics("Disconnected"))
.keepAlive(true);
httpClient.warmup().block();
var list = Flux.interval(Duration.ofMillis(100))
.take(50)
.flatMap(rid -> httpClient.get()
.uri("http://localhost:8088/hello")
.responseContent()
.aggregate()
.asString()
.doOnSubscribe(s -> log.info("Starting request {}", rid))
.doOnSuccess(s -> log.info("Finished request {}: {}", rid, s))
.doOnError(e -> log.info("Request {} failed! {}", rid, e.getClass().getSimpleName()))
.onErrorResume(e -> Mono.just(e.getClass().getSimpleName())))
.publish()
.autoConnect(0)
.collectList();
reportMetrics("Wait for it...");
Thread.sleep(10_000);
sink.tryEmitValue("DONE!");
for (var result : list.block()) {
log.info(result);
}
Thread.sleep(10_000);
reportMetrics("Finished");
connectionProvider.dispose();
serverDisposable.disposeNow();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment