Last active
February 1, 2017 23:10
-
-
Save NiteshKant/9bf665902815a1209bdda228ebbbd866 to your computer and use it in GitHub Desktop.
rs-java-issue-229
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.net.InetSocketAddress; | |
| import java.net.SocketAddress; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.concurrent.atomic.AtomicLong; | |
| import io.reactivesocket.ReactiveSocket; | |
| import io.reactivesocket.client.KeepAliveProvider; | |
| import io.reactivesocket.client.ReactiveSocketClient; | |
| import io.reactivesocket.client.SetupProvider; | |
| import io.reactivesocket.frame.ByteBufferUtil; | |
| import io.reactivesocket.transport.tcp.client.TcpTransportClient; | |
| import io.reactivesocket.util.PayloadImpl; | |
| import io.reactivex.Flowable; | |
| import io.reactivex.functions.Consumer; | |
| import io.reactivex.schedulers.Schedulers; | |
| public class Client { | |
| public static class Performance { | |
| final String url; | |
| final int count; | |
| final double avgSize; | |
| public Performance(String url, int count, double avgSize) { | |
| super(); | |
| this.url = url; | |
| this.count = count; | |
| this.avgSize = avgSize; | |
| } | |
| public String getUrl() { | |
| return url; | |
| } | |
| public int getCount() { | |
| return count; | |
| } | |
| public double getAvgSize() { | |
| return avgSize; | |
| } | |
| @Override | |
| public String toString() { | |
| return "Performance [url=" + url + ", count=" + count + ", avgSize=" + avgSize + "]"; | |
| } | |
| } | |
| public static Flowable<Performance> subscribe(ReactiveSocket socket, String request) { | |
| final AtomicLong received = new AtomicLong(); | |
| final AtomicLong requested = new AtomicLong(); | |
| return Flowable.fromPublisher( | |
| socket.requestSubscription(new PayloadImpl(request))) | |
| .map(payload -> payload.getData()) | |
| .map(ByteBufferUtil::toUtf8String) | |
| .doOnRequest(n -> requested.addAndGet(n)) | |
| .doOnNext(s -> received.incrementAndGet()) | |
| .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // move one down to "fix" it | |
| .buffer(128) | |
| .map(l-> { | |
| double avgSize = l | |
| .stream() | |
| .mapToInt(String::length) | |
| .average() | |
| .orElse(0.0); | |
| return new Performance(request, l.size(), avgSize); | |
| }) | |
| .doFinally(() -> System.out.println("Requested: " + requested.get() + ", Received: " + received.get())); | |
| } | |
| public static void main(String[] args) throws InterruptedException { | |
| int port = 9000; | |
| String host = "localhost"; | |
| SocketAddress address = new InetSocketAddress(host, port); | |
| ReactiveSocket socket = Flowable.fromPublisher(ReactiveSocketClient.create(TcpTransportClient.create(address), | |
| SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease()).connect()).blockingFirst(); | |
| for (int i = 0; i < 1; i++) { | |
| subscribe(socket, "localhost:4096:Object"+i) | |
| .forEach(System.out::println); | |
| } | |
| Thread.sleep(10000000); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #### Server Output | |
| 01 Feb 2017 15:02:08,546 INFO [main] - Rx server started at port: 9000 | |
| 01 Feb 2017 15:02:15,777 INFO [rxnetty-nio-eventloop-1-2] - Creating thread pooled named io.reactivesocket.frame.UnpooledFrame | |
| Got request for [localhost, 4096, Object0] | |
| Requested: 13986. Sent: 13986 | |
| #### Client Output | |
| 01 Feb 2017 15:02:15,529 INFO [main] - Creating thread pooled named io.reactivesocket.frame.UnpooledFrame | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
| io.reactivex.exceptions.MissingBackpressureException: Queue is full?! | |
| at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113) | |
| at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:89) | |
| at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79) | |
| at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
| at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
| at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
| at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166) | |
| at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54) | |
| at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295) | |
| at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232) | |
| at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73) | |
| at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75) | |
| at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
| at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107) | |
| at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71) | |
| at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107) | |
| at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) | |
| at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57) | |
| at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91) | |
| at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) | |
| at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373) | |
| at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189) | |
| at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) | |
| at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) | |
| at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) | |
| at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) | |
| at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) | |
| at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) | |
| at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) | |
| at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551) | |
| at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465) | |
| at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) | |
| at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) | |
| at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) | |
| at java.lang.Thread.run(Thread.java:745) | |
| Exception in thread "pool-2-thread-1" io.reactivex.exceptions.MissingBackpressureException: Queue is full?! | |
| at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113) | |
| at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:89) | |
| at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79) | |
| at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
| at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
| at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
| at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166) | |
| at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54) | |
| at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295) | |
| at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232) | |
| at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73) | |
| at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75) | |
| at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
| at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107) | |
| at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71) | |
| at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107) | |
| at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) | |
| at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57) | |
| at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91) | |
| at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) | |
| at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373) | |
| at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189) | |
| at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) | |
| at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) | |
| at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) | |
| at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) | |
| at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
| at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
| at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
| at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) | |
| at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) | |
| at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) | |
| at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551) | |
| at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465) | |
| at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) | |
| at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) | |
| at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) | |
| at java.lang.Thread.run(Thread.java:745) | |
| __Requested: 13952, Received: 13910__ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.BufferedReader; | |
| import java.io.IOException; | |
| import java.io.InputStreamReader; | |
| import java.util.Arrays; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.concurrent.atomic.AtomicLong; | |
| import org.reactivestreams.Publisher; | |
| import io.reactivesocket.AbstractReactiveSocket; | |
| import io.reactivesocket.Payload; | |
| import io.reactivesocket.frame.ByteBufferUtil; | |
| import io.reactivesocket.lease.DisabledLeaseAcceptingSocket; | |
| import io.reactivesocket.server.ReactiveSocketServer; | |
| import io.reactivesocket.transport.tcp.server.TcpTransportServer; | |
| import io.reactivesocket.util.PayloadImpl; | |
| import io.reactivex.Flowable; | |
| import org.slf4j.event.Level; | |
| public class Server { | |
| public static void main(String[] args) throws IOException { | |
| int port = 9000; | |
| ReactiveSocketServer.create(TcpTransportServer.create(port)) | |
| .start((setupPayload, reactiveSocket) -> { | |
| return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { | |
| @Override | |
| public Publisher<Payload> requestResponse(Payload p) { | |
| return Flowable.just(p); | |
| } | |
| @Override | |
| public Publisher<Payload> requestSubscription(Payload p) { | |
| String[] request = ByteBufferUtil.toUtf8String(p.getData()).split(":"); | |
| int size = Integer.parseInt(request[1]); | |
| System.out.println("Got request for " + Arrays.toString(request)); | |
| final byte[] buff = new byte[size]; | |
| Arrays.fill(buff, (byte)65); | |
| final AtomicLong requested = new AtomicLong(); | |
| final AtomicLong sent = new AtomicLong(); | |
| Flowable.interval(30, TimeUnit.SECONDS) | |
| .doOnNext(aLong -> System.out.println("Requested: " + requested.get() | |
| + ". Sent: " + sent.get())) | |
| .ignoreElements() | |
| .ambWith(Flowable.fromPublisher(reactiveSocket.onClose()).ignoreElements()) | |
| .doFinally(() -> System.out.println("Requested: " + requested.get() | |
| + ". Sent: " + sent.get())) | |
| .subscribe(); | |
| return Flowable.range(1, Integer.MAX_VALUE) | |
| .doOnRequest(n -> requested.addAndGet(n)) | |
| .map(integer -> { | |
| sent.incrementAndGet(); | |
| return new PayloadImpl(buff); | |
| }); | |
| } | |
| }); | |
| }); | |
| new BufferedReader(new InputStreamReader(System.in)).readLine(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment