Skip to content

Instantly share code, notes, and snippets.

@unludo
Created February 26, 2020 15:39
Show Gist options
  • Save unludo/299ddd977fb92699c233529877e97565 to your computer and use it in GitHub Desktop.
Save unludo/299ddd977fb92699c233529877e97565 to your computer and use it in GitHub Desktop.
bidir stream example for gatling-grpc - below 0.8 version
def connect(productIdFieldName: String, token: String, checkLicenseUpdate:Boolean = false): GrpcCallActionBuilder[ProductConnectionReq, ProductStrmgMessage] = {
val dT = Calendar.getInstance()
grpc("connectReq")
.service(ProductServiceGrpc.stub)
.rpc(service => { req: ProductConnectionReq =>
val p = Promise[ProductStrmgMessage]()
val requestObserver = service.connect(
new ClientResponseObserver[ProductConnectionReq, ProductStrmgMessage] {
var requestStream: ClientCallStreamObserver[ProductConnectionReq] = _
override def beforeStart(requestStream: ClientCallStreamObserver[ProductConnectionReq]): Unit = {
this.requestStream = requestStream
}
override def onNext(value: ProductStrmgMessage): Unit = {
println(s"[${dT.getTime()}]==================================================>Next " + value.data.isLicenseUpdate)
if (!checkLicenseUpdate) {
p.trySuccess(value)
}
if(checkLicenseUpdate && value.data.isLicenseUpdate){
p.trySuccess(value)
}
}
override def onError(throwable: Throwable): Unit = {
if (p != null) {
p.tryFailure(throwable)
}
}
override def onCompleted(): Unit = {
println(s"[${dT.getTime()}]==================>Completed!")
}
})
requestObserver.onNext(req)
p.future
})(ProductConnectionReq.defaultInstance.withLastpublicip("192.168.1.9"))
.header(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))(token)
.header(Metadata.Key.of("productId", Metadata.ASCII_STRING_MARSHALLER))($(productIdFieldName))
// called with this
val hwIdFeeder = (1 to nbProducts).toStream.map(i => Map("hwid" -> ("perf" + i.toString))).toIterator
val productsScenario = scenario("Products connect and disconnect after 1 minute")
.feed(hwIdFeeder)
.exec(ProductsServiceGrpc.connect("hwid", token))
// the grpc proto rpc method looks like this
// rpc connect (stream ProductConnectionReq) returns (stream ProductStrmgMessage);
@UserNoOne
Copy link

this is what happening in that case

0:09:48.562 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}

10:09:48.565 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041

10:09:48.574 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=250, INITIAL_WINDOW_SIZE=1048576, MAX_FRAME_SIZE=1048576, MAX_HEADER_LIST_SIZE=1048896}

10:09:48.576 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND SETTINGS: ack=true

10:09:48.609 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: example.com:443, :path: /ServerStreamer/serverStream, :method: POST, :scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false

10:09:48.617 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, OUTBOUND DATA: streamId=3 padding=0 endStream=true length=7 bytes=00000000020809

10:09:48.881 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041

10:09:48.883 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND SETTINGS: ack=true

10:09:48.931 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7

10:09:48.948 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2, INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 0, content-length: 0, date: Wed, 23 Sep 2020 04:39:48 GMT] padding=0 endStream=true

[1600835986150]==================>onCompleted!

And after a duration of ~5 mins observed this random debug log

10:12:48.978 [DEBUG] i.g.n.NettyClientHandler - [id: 0xdbd69cd2] INBOUND GO_AWAY: lastStreamId=3 errorCode=0 length=0 bytes=

@UserNoOne
Copy link

verified server logs as well, seeing issue in gatling/client end only when "no data" received from serverStream

Below is the normal flow when everything goes okay

15:35:42.711 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  OUTBOUND HEADERS: streamId=13 
headers=GrpcHttp2OutboundHeaders[:authority: example:443, :path: /ServerStreamer/serverStream, :method: POST, 
:scheme: https, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.29.0, grpc-accept-encoding: gzip] 
streamDependency=0 weight=16 exclusive=false padding=0 endStream=false


15:35:42.713 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  OUTBOUND DATA: streamId=13 padding=0 endStream=true length=7 bytes=0000000002081e

15:35:43.481 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  INBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=7

15:35:43.714 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders
[:status: 200, content-type: application/grpc, grpc-accept-encoding: gzip, grpc-encoding: identity, date: Wed, 23 Sep 2020 10:05:43 GMT] 
padding=0 endStream=false

15:35:43.715 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda,  INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f6bbacfb051080c999ac03


15:35:44.702 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=21 bytes=0000000010081e120c08f7bbacfb0510c0bcfeec01

15:35:44.827 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND DATA: streamId=13 padding=0 endStream=false length=20 bytes=000000000f081e120b08f8bbacfb0510808cfb29

15:35:44.828 [DEBUG] i.g.n.NettyClientHandler - [id: 0x70d7beda, ] INBOUND HEADERS: streamId=13 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] padding=0 endStream=true

[1600855542707]==================>onCompleted! 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment