Created
February 26, 2020 15:39
-
-
Save unludo/299ddd977fb92699c233529877e97565 to your computer and use it in GitHub Desktop.
bidir stream example for gatling-grpc - below 0.8 version
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def connect(productIdFieldName: String, token: String, checkLicenseUpdate:Boolean = false): GrpcCallActionBuilder[ProductConnectionReq, ProductStrmgMessage] = { | |
| val dT = Calendar.getInstance() | |
| grpc("connectReq") | |
| .service(ProductServiceGrpc.stub) | |
| .rpc(service => { req: ProductConnectionReq => | |
| val p = Promise[ProductStrmgMessage]() | |
| val requestObserver = service.connect( | |
| new ClientResponseObserver[ProductConnectionReq, ProductStrmgMessage] { | |
| var requestStream: ClientCallStreamObserver[ProductConnectionReq] = _ | |
| override def beforeStart(requestStream: ClientCallStreamObserver[ProductConnectionReq]): Unit = { | |
| this.requestStream = requestStream | |
| } | |
| override def onNext(value: ProductStrmgMessage): Unit = { | |
| println(s"[${dT.getTime()}]==================================================>Next " + value.data.isLicenseUpdate) | |
| if (!checkLicenseUpdate) { | |
| p.trySuccess(value) | |
| } | |
| if(checkLicenseUpdate && value.data.isLicenseUpdate){ | |
| p.trySuccess(value) | |
| } | |
| } | |
| override def onError(throwable: Throwable): Unit = { | |
| if (p != null) { | |
| p.tryFailure(throwable) | |
| } | |
| } | |
| override def onCompleted(): Unit = { | |
| println(s"[${dT.getTime()}]==================>Completed!") | |
| } | |
| }) | |
| requestObserver.onNext(req) | |
| p.future | |
| })(ProductConnectionReq.defaultInstance.withLastpublicip("192.168.1.9")) | |
| .header(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))(token) | |
| .header(Metadata.Key.of("productId", Metadata.ASCII_STRING_MARSHALLER))($(productIdFieldName)) | |
| // called with this | |
| val hwIdFeeder = (1 to nbProducts).toStream.map(i => Map("hwid" -> ("perf" + i.toString))).toIterator | |
| val productsScenario = scenario("Products connect and disconnect after 1 minute") | |
| .feed(hwIdFeeder) | |
| .exec(ProductsServiceGrpc.connect("hwid", token)) | |
| // the grpc proto rpc method looks like this | |
| // rpc connect (stream ProductConnectionReq) returns (stream ProductStrmgMessage); |
@UserNoOne If the onComplete fires it can also be that the server stops the connection because of network issue. Maybe it requires a kind of keepalive.
Tried adding keep alive connection in client end but seems something else going wrong!.
Even if the server closes the connection, client normal operation shouldn't get impacted AFAIK as it already fired onComplete
this is what happening in that case
0:09:48.562 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
10:09:48.565 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
10:09:48.574 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=250, INITIAL_WINDOW_SIZE=1048576, MAX_FRAME_SIZE=1048576, MAX_HEADER_LIST_SIZE=1048896}
10:09:48.576 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=true
10:09:48.609 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: example.com:443, :path: /ServerStreamer/serverStream, :method: POST, :scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
10:09:48.617 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND DATA: streamId=3 padding=0 endStream=true length=7 bytes=00000000020809
10:09:48.881 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
10:09:48.883 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=true
10:09:48.931 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7
10:09:48.948 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 0, content-length: 0, date: Wed, 23 Sep 2020 04:39:48 GMT] padding=0 endStream=true
[1600835986150]==================>onCompleted!
And after a duration of ~5 mins observed this random debug log
10:12:48.978 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2] INBOUND GO_AWAY: lastStreamId=3 errorCode=0 length=0 bytes=
verified server logs as well, seeing issue in gatling/client end only when "no data" received from serverStream
Below is the normal flow when everything goes okay
15:35:42.711 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, OUTBOUND HEADERS: streamId=13
headers=GrpcHttp2OutboundHeaders[:authority: example:443, :path: /ServerStreamer/serverStream, :method: POST,
:scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip]
streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
15:35:42.713 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, OUTBOUND DATA: streamId=13 padding=0 endStream=true length=7 bytes=0000000002081e
15:35:43.481 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7
15:35:43.714 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders
[:status: 200, content-type: application/grpc, grpc-accept-encoding: gzip, grpc-encoding: identity, date: Wed, 23 Sep 2020 10:05:43 GMT]
padding=0 endStream=false
15:35:43.715 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f6bbacfb051080c999ac03
15:35:44.702 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f7bbacfb0510c0bcfeec01
15:35:44.827 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=20 bytes=000000000f081e120b08f8bbacfb0510808cfb29
15:35:44.828 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] padding=0 endStream=true
[1600855542707]==================>onCompleted!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In normal cases, after receiving lot of responses from servers and its ending with either with onComplete or onError and finally stream is getting ended. but only in above mentioned case simulation is hung up and not processing further