Last active
June 14, 2022 13:13
-
-
Save mdindoffer/c0e65bd9075ca7bdc8b639ebbf2c0fde to your computer and use it in GitHub Desktop.
RSocket Requester & Responder leak
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.rsocket.RSocket; | |
import io.rsocket.core.RSocketConnector; | |
import io.rsocket.frame.decoder.PayloadDecoder; | |
import io.rsocket.test.PingClient; | |
import io.rsocket.transport.netty.client.TcpClientTransport; | |
import org.HdrHistogram.Recorder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import reactor.core.publisher.Mono; | |
public class RequesterResponderLeakReproducerClient { | |
private static final Logger LOG = LoggerFactory.getLogger(RequesterResponderLeakReproducerClient.class); | |
public static void main(String[] args) { | |
new RequesterResponderLeakReproducerClient().createClientAndExchangePayloadTCP(); | |
} | |
private void createClientAndExchangePayloadTCP() { | |
Mono<RSocket> client = | |
RSocketConnector.create() | |
.payloadDecoder(PayloadDecoder.ZERO_COPY) | |
.connect(TcpClientTransport.create(7878)); | |
PingClient pingClient = new PingClient(client); | |
RSocket clientRsocket = client.block(); | |
clientRsocket.onClose().subscribe(null, throwable -> LOG.error("RSocket client throwable {}", throwable), () -> LOG.info("Completed closing of the Client")); | |
int count = 1_000; | |
pingClient | |
.requestResponsePingPong(count, new Recorder(3600000000000L, 3)) | |
.doOnTerminate(() -> LOG.info("Sent {} messages", count)) | |
.blockLast(); | |
clientRsocket.dispose(); | |
clientRsocket.onClose().block(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.rsocket.core.RSocketServer; | |
import io.rsocket.frame.decoder.PayloadDecoder; | |
import io.rsocket.test.PingHandler; | |
import io.rsocket.transport.netty.server.CloseableChannel; | |
import io.rsocket.transport.netty.server.TcpServerTransport; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Hooks; | |
import reactor.core.publisher.Mono; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Optional; | |
import java.util.stream.Collectors; | |
public class RequesterResponderLeakReproducerServer { | |
private static final Logger LOG = LoggerFactory.getLogger(RequesterResponderLeakReproducerServer.class); | |
private final Map<String, Integer> subscriptionStats = Collections.synchronizedMap(new HashMap<>()); | |
public static void main(String[] args) throws InterruptedException { | |
new RequesterResponderLeakReproducerServer().run(); | |
} | |
private void run() throws InterruptedException { | |
registerReactorHooks(); | |
createAndStartServerTCP(); | |
while (true) { | |
System.gc(); | |
printSubscriptionStats(); | |
Thread.sleep(2000); | |
} | |
} | |
private CloseableChannel createAndStartServerTCP() { | |
return RSocketServer.create(new PingHandler()) | |
.payloadDecoder(PayloadDecoder.ZERO_COPY) | |
.bind(TcpServerTransport.create(7878)) | |
.block(); | |
} | |
private void printSubscriptionStats() { | |
LOG.info("Active Reactor subscriptions: \n{}", | |
subscriptionStats.entrySet().stream() | |
.sorted(Map.Entry.comparingByKey()) | |
.map(Object::toString) | |
.collect(Collectors.joining("\n"))); | |
} | |
private void registerReactorHooks() { | |
Hooks.onLastOperator(objectPublisher -> { | |
Optional<String> callerFrameLoc = StackWalker.getInstance() | |
.walk(stackFrameStream -> stackFrameStream | |
.filter(stackFrame -> !stackFrame.getClassName().startsWith("java.")) | |
.filter(stackFrame -> !stackFrame.getClassName().contains(this.getClass().getName())) | |
.filter(stackFrame -> !stackFrame.getClassName().startsWith("reactor.")) | |
.map(Object::toString) | |
.findFirst()); | |
callerFrameLoc.ifPresent(stackFrame -> subscriptionStats.merge(stackFrame, 1, Integer::sum)); | |
if (objectPublisher instanceof Mono) { | |
return Hooks.convertToMonoBypassingHooks(objectPublisher, false) | |
.doFinally(signalType -> callerFrameLoc.ifPresent(this::unregisterSubscriptionFrame)); | |
} else if (objectPublisher instanceof Flux) { | |
return Hooks.convertToFluxBypassingHooks(objectPublisher) | |
.doFinally(signalType -> callerFrameLoc.ifPresent(this::unregisterSubscriptionFrame)); | |
} else { | |
return objectPublisher; | |
} | |
}); | |
} | |
private void unregisterSubscriptionFrame(String callerFrame) { | |
subscriptionStats.merge(callerFrame, 1, (integer, integer2) -> { | |
int out = integer - integer2; | |
if (out == 0) { | |
return null; | |
} else { | |
return out; | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment