Skip to content

Instantly share code, notes, and snippets.

@unludo
Created February 26, 2020 15:39
Show Gist options
  • Save unludo/299ddd977fb92699c233529877e97565 to your computer and use it in GitHub Desktop.
Save unludo/299ddd977fb92699c233529877e97565 to your computer and use it in GitHub Desktop.
bidir stream example for gatling-grpc - below 0.8 version
def connect(productIdFieldName: String, token: String, checkLicenseUpdate:Boolean = false): GrpcCallActionBuilder[ProductConnectionReq, ProductStrmgMessage] = {
val dT = Calendar.getInstance()
grpc("connectReq")
.service(ProductServiceGrpc.stub)
.rpc(service => { req: ProductConnectionReq =>
val p = Promise[ProductStrmgMessage]()
val requestObserver = service.connect(
new ClientResponseObserver[ProductConnectionReq, ProductStrmgMessage] {
var requestStream: ClientCallStreamObserver[ProductConnectionReq] = _
override def beforeStart(requestStream: ClientCallStreamObserver[ProductConnectionReq]): Unit = {
this.requestStream = requestStream
}
override def onNext(value: ProductStrmgMessage): Unit = {
println(s"[${dT.getTime()}]==================================================>Next " + value.data.isLicenseUpdate)
if (!checkLicenseUpdate) {
p.trySuccess(value)
}
if(checkLicenseUpdate && value.data.isLicenseUpdate){
p.trySuccess(value)
}
}
override def onError(throwable: Throwable): Unit = {
if (p != null) {
p.tryFailure(throwable)
}
}
override def onCompleted(): Unit = {
println(s"[${dT.getTime()}]==================>Completed!")
}
})
requestObserver.onNext(req)
p.future
})(ProductConnectionReq.defaultInstance.withLastpublicip("192.168.1.9"))
.header(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))(token)
.header(Metadata.Key.of("productId", Metadata.ASCII_STRING_MARSHALLER))($(productIdFieldName))
// called with this
val hwIdFeeder = (1 to nbProducts).toStream.map(i => Map("hwid" -> ("perf" + i.toString))).toIterator
val productsScenario = scenario("Products connect and disconnect after 1 minute")
.feed(hwIdFeeder)
.exec(ProductsServiceGrpc.connect("hwid", token))
// the grpc proto rpc method looks like this
// rpc connect (stream ProductConnectionReq) returns (stream ProductStrmgMessage);
@unludo
Copy link
Author

unludo commented Sep 22, 2020

@UserNoOne If you want to have parallel calls, you need to create a simulation made of several scenarios. Something like this:

val scenario1 = scenario(...
val scenario2 = scenario(...
simulation.setup(
scenario1.inject(... ),
scenario2.inject(... ))

The scenarios will run in parallel.

@UserNoOne
Copy link

UserNoOne commented Sep 23, 2020

Thanks @unludo for the quick response, I have added a non-blocking server Stream call but seeing an issue

   exec(grpc("Server Stream Request")
      .service(ServerStreamerGrpc.stub)
      .rpc(service => { req: StreamRequest =>
        val p = Promise[StreamResponse]()
        val responseObserver = new StreamObserver[StreamRequest ]{
          
         override def onNext(value: StreamResponse): Unit = {
            println(s"[${dT.getTime()}]==================================================>onNext " + value.requestId)
            p.trySuccess(value)
          }

          override def onError(throwable: Throwable): Unit = {
            println(s"[${dT.getTime()}]==================>onError!")
            if (p != null) {
              p.tryFailure(throwable)
            }
          }

          override def onCompleted(): Unit = {
            println(s"[${dT.getTime()}]==================>onCompleted!")
          }
        }
        service.serverStream(req, responseObserver)
        p.future
      })(StreamRequest .defaultInstance.withRequestId(10L))
  1. Whenever initial/first response from sever stream is grpc-status: 0, content-length: 0, endStream=true, gatling actor is hung up and simulation is not being processed further

@unludo
Copy link
Author

unludo commented Sep 23, 2020

@UserNoOne in this case, the onCompleted fires, right? In this case the future is completed and the step is finished. Does the server side you are testing ends the stream? It seems to be that. You need to keep the stream alive, so if the server ends it, then you can't go further. Can you change this behavior?

@UserNoOne
Copy link

yes @unludo onCompleted is getting fired. unfortunately my server is black box (don't have any info apart from proto files). what changes I need to do for my client end i.e gatling end ?

@UserNoOne
Copy link

In normal cases, after receiving lot of responses from servers and its ending with either with onComplete or onError and finally stream is getting ended. but only in above mentioned case simulation is hung up and not processing further

@unludo
Copy link
Author

unludo commented Sep 23, 2020

@UserNoOne If the onComplete fires it can also be that the server stops the connection because of network issue. Maybe it requires a kind of keepalive.

@UserNoOne
Copy link

UserNoOne commented Sep 23, 2020

Tried adding keep alive connection in client end but seems something else going wrong!.
Even if the server closes the connection, client normal operation shouldn't get impacted AFAIK as it already fired onComplete

@UserNoOne
Copy link

this is what happening in that case

0:09:48.562 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}

10:09:48.565 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041

10:09:48.574 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=250, INITIAL_WINDOW_SIZE=1048576, MAX_FRAME_SIZE=1048576, MAX_HEADER_LIST_SIZE=1048896}

10:09:48.576 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=true

10:09:48.609 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: example.com:443, :path: /ServerStreamer/serverStream, :method: POST, :scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false

10:09:48.617 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND DATA: streamId=3 padding=0 endStream=true length=7 bytes=00000000020809

10:09:48.881 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041

10:09:48.883 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=true

10:09:48.931 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7

10:09:48.948 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 0, content-length: 0, date: Wed, 23 Sep 2020 04:39:48 GMT] padding=0 endStream=true

[1600835986150]==================>onCompleted!

And after a duration of ~5 mins observed this random debug log

10:12:48.978 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2] INBOUND GO_AWAY: lastStreamId=3 errorCode=0 length=0 bytes=

@UserNoOne
Copy link

verified server logs as well, seeing issue in gatling/client end only when "no data" received from serverStream

Below is the normal flow when everything goes okay

15:35:42.711 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  OUTBOUND HEADERS: streamId=13 
headers=GrpcHttp2OutboundHeaders[:authority: example:443, :path: /ServerStreamer/serverStream, :method: POST, 
:scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip] 
streamDependency=0 weight=16 exclusive=false padding=0 endStream=false


15:35:42.713 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  OUTBOUND DATA: streamId=13 padding=0 endStream=true length=7 bytes=0000000002081e

15:35:43.481 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7

15:35:43.714 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders
[:status: 200, content-type: application/grpc, grpc-accept-encoding: gzip, grpc-encoding: identity, date: Wed, 23 Sep 2020 10:05:43 GMT] 
padding=0 endStream=false

15:35:43.715 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f6bbacfb051080c999ac03


15:35:44.702 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f7bbacfb0510c0bcfeec01

15:35:44.827 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=20 bytes=000000000f081e120b08f8bbacfb0510808cfb29

15:35:44.828 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] padding=0 endStream=true

[1600855542707]==================>onCompleted! 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment