-
-
Save unludo/299ddd977fb92699c233529877e97565 to your computer and use it in GitHub Desktop.
| def connect(productIdFieldName: String, token: String, checkLicenseUpdate:Boolean = false): GrpcCallActionBuilder[ProductConnectionReq, ProductStrmgMessage] = { | |
| val dT = Calendar.getInstance() | |
| grpc("connectReq") | |
| .service(ProductServiceGrpc.stub) | |
| .rpc(service => { req: ProductConnectionReq => | |
| val p = Promise[ProductStrmgMessage]() | |
| val requestObserver = service.connect( | |
| new ClientResponseObserver[ProductConnectionReq, ProductStrmgMessage] { | |
| var requestStream: ClientCallStreamObserver[ProductConnectionReq] = _ | |
| override def beforeStart(requestStream: ClientCallStreamObserver[ProductConnectionReq]): Unit = { | |
| this.requestStream = requestStream | |
| } | |
| override def onNext(value: ProductStrmgMessage): Unit = { | |
| println(s"[${dT.getTime()}]==================================================>Next " + value.data.isLicenseUpdate) | |
| if (!checkLicenseUpdate) { | |
| p.trySuccess(value) | |
| } | |
| if(checkLicenseUpdate && value.data.isLicenseUpdate){ | |
| p.trySuccess(value) | |
| } | |
| } | |
| override def onError(throwable: Throwable): Unit = { | |
| if (p != null) { | |
| p.tryFailure(throwable) | |
| } | |
| } | |
| override def onCompleted(): Unit = { | |
| println(s"[${dT.getTime()}]==================>Completed!") | |
| } | |
| }) | |
| requestObserver.onNext(req) | |
| p.future | |
| })(ProductConnectionReq.defaultInstance.withLastpublicip("192.168.1.9")) | |
| .header(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))(token) | |
| .header(Metadata.Key.of("productId", Metadata.ASCII_STRING_MARSHALLER))($(productIdFieldName)) | |
| // called with this | |
| val hwIdFeeder = (1 to nbProducts).toStream.map(i => Map("hwid" -> ("perf" + i.toString))).toIterator | |
| val productsScenario = scenario("Products connect and disconnect after 1 minute") | |
| .feed(hwIdFeeder) | |
| .exec(ProductsServiceGrpc.connect("hwid", token)) | |
| // the grpc proto rpc method looks like this | |
| // rpc connect (stream ProductConnectionReq) returns (stream ProductStrmgMessage); |
Thanks @unludo for the quick response, I have added a non-blocking server Stream call but seeing an issue
exec(grpc("Server Stream Request")
.service(ServerStreamerGrpc.stub)
.rpc(service => { req: StreamRequest =>
val p = Promise[StreamResponse]()
val responseObserver = new StreamObserver[StreamRequest ]{
override def onNext(value: StreamResponse): Unit = {
println(s"[${dT.getTime()}]==================================================>onNext " + value.requestId)
p.trySuccess(value)
}
override def onError(throwable: Throwable): Unit = {
println(s"[${dT.getTime()}]==================>onError!")
if (p != null) {
p.tryFailure(throwable)
}
}
override def onCompleted(): Unit = {
println(s"[${dT.getTime()}]==================>onCompleted!")
}
}
service.serverStream(req, responseObserver)
p.future
})(StreamRequest .defaultInstance.withRequestId(10L))
- Whenever initial/first response from sever stream is grpc-status: 0, content-length: 0, endStream=true, gatling actor is hung up and simulation is not being processed further
@UserNoOne in this case, the onCompleted fires, right? In this case the future is completed and the step is finished. Does the server side you are testing ends the stream? It seems to be that. You need to keep the stream alive, so if the server ends it, then you can't go further. Can you change this behavior?
yes @unludo onCompleted is getting fired. unfortunately my server is black box (don't have any info apart from proto files). what changes I need to do for my client end i.e gatling end ?
In normal cases, after receiving lot of responses from servers and its ending with either with onComplete or onError and finally stream is getting ended. but only in above mentioned case simulation is hung up and not processing further
@UserNoOne If the onComplete fires it can also be that the server stops the connection because of network issue. Maybe it requires a kind of keepalive.
Tried adding keep alive connection in client end but seems something else going wrong!.
Even if the server closes the connection, client normal operation shouldn't get impacted AFAIK as it already fired onComplete
this is what happening in that case
0:09:48.562 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
10:09:48.565 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
10:09:48.574 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=250, INITIAL_WINDOW_SIZE=1048576, MAX_FRAME_SIZE=1048576, MAX_HEADER_LIST_SIZE=1048896}
10:09:48.576 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=true
10:09:48.609 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: example.com:443, :path: /ServerStreamer/serverStream, :method: POST, :scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
10:09:48.617 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND DATA: streamId=3 padding=0 endStream=true length=7 bytes=00000000020809
10:09:48.881 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
10:09:48.883 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=true
10:09:48.931 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7
10:09:48.948 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 0, content-length: 0, date: Wed, 23 Sep 2020 04:39:48 GMT] padding=0 endStream=true
[1600835986150]==================>onCompleted!
And after a duration of ~5 mins observed this random debug log
10:12:48.978 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2] INBOUND GO_AWAY: lastStreamId=3 errorCode=0 length=0 bytes=
verified server logs as well, seeing issue in gatling/client end only when "no data" received from serverStream
Below is the normal flow when everything goes okay
15:35:42.711 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, OUTBOUND HEADERS: streamId=13
headers=GrpcHttp2OutboundHeaders[:authority: example:443, :path: /ServerStreamer/serverStream, :method: POST,
:scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip]
streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
15:35:42.713 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, OUTBOUND DATA: streamId=13 padding=0 endStream=true length=7 bytes=0000000002081e
15:35:43.481 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7
15:35:43.714 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders
[:status: 200, content-type: application/grpc, grpc-accept-encoding: gzip, grpc-encoding: identity, date: Wed, 23 Sep 2020 10:05:43 GMT]
padding=0 endStream=false
15:35:43.715 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f6bbacfb051080c999ac03
15:35:44.702 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f7bbacfb0510c0bcfeec01
15:35:44.827 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=20 bytes=000000000f081e120b08f8bbacfb0510808cfb29
15:35:44.828 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] padding=0 endStream=true
[1600855542707]==================>onCompleted!
@UserNoOne If you want to have parallel calls, you need to create a simulation made of several scenarios. Something like this:
val scenario1 = scenario(...
val scenario2 = scenario(...
simulation.setup(
scenario1.inject(... ),
scenario2.inject(... ))
The scenarios will run in parallel.