Last active
July 25, 2025 03:51
-
-
Save wchargin/9934394d50185f62e49188d2268da937 to your computer and use it in GitHub Desktop.
repro for Dart HTTP/2 issue: client RST_STREAM forgets stream ID and destroys connection on incoming server frame
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### | |
# gRPC server, in Go | |
### | |
FROM golang:1.24.5-alpine AS server | |
WORKDIR /root | |
RUN apk add git | |
RUN git clone --branch v1.74.2 --depth 1 https://github.com/grpc/grpc-go | |
RUN git -C grpc-go apply <<'END_OF_PATCH' | |
diff --git a/examples/helloworld/greeter_server/main.go b/examples/helloworld/greeter_server/main.go | |
index f7ccf1c..d6bc60d 100644 | |
--- a/examples/helloworld/greeter_server/main.go | |
+++ b/examples/helloworld/greeter_server/main.go | |
@@ -25,6 +25,7 @@ import ( | |
"fmt" | |
"log" | |
"net" | |
+ "time" | |
"google.golang.org/grpc" | |
pb "google.golang.org/grpc/examples/helloworld/helloworld" | |
@@ -40,9 +41,17 @@ type server struct { | |
} | |
// SayHello implements helloworld.GreeterServer | |
-func (s *server) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { | |
- log.Printf("Received: %v", in.GetName()) | |
- return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil | |
+func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { | |
+ name := in.GetName() | |
+ log.Printf("SayHello(%q): ctx = %v\n", ctx, name) | |
+ select { | |
+ case <-ctx.Done(): | |
+ fmt.Printf("context closed! for %q\n", in.GetName()) | |
+ return nil, ctx.Err() | |
+ case <-time.After(100 * time.Millisecond): | |
+ fmt.Printf("waited the time. returning the err. for %q\n", in.GetName()) | |
+ return &pb.HelloReply{Message: fmt.Sprintf("Hello, %q!", name)}, nil | |
+ } | |
} | |
func main() { | |
END_OF_PATCH | |
RUN go -C grpc-go/examples/helloworld build -o /root/grpc-server ./greeter_server/main.go | |
### | |
# gRPC client, in Dart | |
### | |
FROM dart:3.8.2 | |
WORKDIR /root | |
COPY --from=server /root/grpc-server /root/grpc-server | |
RUN git clone --branch v3.2.2 --depth 1 https://github.com/grpc/grpc-dart | |
RUN git clone --branch http2-v2.3.1 --depth 1 https://github.com/dart-lang/http /root/dart-http | |
RUN git -C grpc-dart apply <<'END_OF_PATCH' | |
diff --git a/example/helloworld/bin/client.dart b/example/helloworld/bin/client.dart | |
index 4d4635d..87a2e2d 100644 | |
--- a/example/helloworld/bin/client.dart | |
+++ b/example/helloworld/bin/client.dart | |
@@ -27,18 +27,32 @@ Future<void> main(List<String> args) async { | |
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), | |
), | |
); | |
+ channel.onConnectionStateChanged.listen((event) { | |
+ print('changed! $event'); | |
+ }); | |
final stub = GreeterClient(channel); | |
- final name = args.isNotEmpty ? args[0] : 'world'; | |
+ int callIndex = 1; | |
+ while (true) { | |
+ String name = 'short-$callIndex'; | |
+ print(""); | |
+ print("making short call"); | |
+ await call(stub, name); | |
+ callIndex++; | |
+ } | |
+ await channel.shutdown(); | |
+} | |
+ | |
+Future<void> call(GreeterClient stub, String name) async { | |
+ CallOptions opts = CallOptions(timeout: const Duration(milliseconds: 100)); | |
try { | |
final response = await stub.sayHello( | |
HelloRequest()..name = name, | |
- options: CallOptions(compression: const GzipCodec()), | |
+ options: opts, | |
); | |
- print('Greeter client received: ${response.message}'); | |
+ print('${name}: Greeter client received: ${response.message}'); | |
} catch (e) { | |
- print('Caught error: $e'); | |
+ print('${name}: Caught error: $e'); | |
} | |
- await channel.shutdown(); | |
} | |
diff --git a/pubspec.yaml b/pubspec.yaml | |
index 0129f38..4656cb5 100644 | |
--- a/pubspec.yaml | |
+++ b/pubspec.yaml | |
@@ -16,7 +16,8 @@ dependencies: | |
googleapis_auth: ^1.1.0 | |
meta: ^1.3.0 | |
http: '>=0.13.0 <2.0.0' | |
- http2: ^2.0.0 | |
+ http2: | |
+ path: /root/dart-http/pkgs/http2 | |
protobuf: '>=2.0.0 <4.0.0' | |
dev_dependencies: | |
END_OF_PATCH | |
RUN git -C dart-http apply <<'END_OF_PATCH' | |
diff --git a/pkgs/http2/lib/src/connection.dart b/pkgs/http2/lib/src/connection.dart | |
index 4e52e57..2b63ecf 100644 | |
--- a/pkgs/http2/lib/src/connection.dart | |
+++ b/pkgs/http2/lib/src/connection.dart | |
@@ -164,3 +164,6 @@ abstract class Connection { | |
_frameReaderSubscription = incomingFrames.listen((Frame frame) { | |
- _catchProtocolErrors(() => _handleFrameImpl(frame)); | |
+ _catchProtocolErrors(() { | |
+ print('wctest: processing frame: $frame'); | |
+ return _handleFrameImpl(frame); | |
+ }); | |
}, onError: (error, stack) { | |
@@ -302,2 +305,3 @@ abstract class Connection { | |
} on ProtocolException catch (error) { | |
+ print('_catchProtocolErrors got PROTOCOL_ERROR: $error'); | |
_terminate(ErrorCode.PROTOCOL_ERROR, message: '$error'); | |
@@ -433,2 +437,4 @@ abstract class Connection { | |
// Close all streams & stream queues | |
+ print('wctest: forcefully terminating!!! stack trace: ${StackTrace.current}'); | |
+ | |
_streams.terminate(exception); | |
diff --git a/pkgs/http2/lib/src/streams/stream_handler.dart b/pkgs/http2/lib/src/streams/stream_handler.dart | |
index 92a228d..0ebb822 100644 | |
--- a/pkgs/http2/lib/src/streams/stream_handler.dart | |
+++ b/pkgs/http2/lib/src/streams/stream_handler.dart | |
@@ -337,2 +337,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
if (stream.state == StreamState.HalfClosedLocal) { | |
+ print('wctest: streamQueueIn.onCancel.then callback for id=${streamId}'); | |
stream.outgoingQueue | |
@@ -411,2 +412,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
stream.state == StreamState.ReservedRemote) { | |
+ print('wctest: _terminateStream: client writing RST_STREAM for id=${stream.id}'); | |
_frameWriter.writeRstStreamFrame(stream.id, ErrorCode.CANCEL); | |
@@ -480,2 +482,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
} on StreamClosedException catch (exception) { | |
+ print('wctest: processStreamFrame catch StreamClosedException: client writing RST_STREAM for id=${exception.streamId}'); | |
_frameWriter.writeRstStreamFrame( | |
@@ -484,2 +487,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
} on StreamException catch (exception) { | |
+ print('wctest: processStreamFrame catch StreamException: client writing RST_STREAM for id=${exception.streamId}'); | |
_frameWriter.writeRstStreamFrame( | |
@@ -520,2 +524,7 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
var stream = _openStreams[frame.header.streamId]; | |
+ | |
+ if (frame is HeadersFrame) { | |
+ print('wctest: http2...stream_handler: _processStreamFrameInternal: frame is HeadersFrame; streamId=${frame.header.streamId}; stream=$stream'); | |
+ } | |
+ | |
if (stream == null) { | |
@@ -547,2 +556,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
} else { | |
+ print('wctest: complaining about headers frame: stack: ${StackTrace.current}'); | |
// A server cannot open new streams to the client. The only way | |
@@ -696,2 +706,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
// (e.g. MessageQueues which are not empty yet). | |
+ print('wctest: _handleEndOfStreamRemote: removing streamId: ${stream.id}'); | |
_openStreams.remove(stream.id); | |
@@ -716,2 +727,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
} | |
+ print('wctest: stream_handler.dart: _sendHeaders: id=${stream.id}; headers=$headers; endStream=$endStream'); | |
@@ -745,2 +757,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
void _endStream(Http2StreamImpl stream) { | |
+ print('wctest: stream_handler: _endStream: id=${stream.id}'); | |
if (stream.state == StreamState.Open) { | |
@@ -763,2 +776,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin { | |
incomingQueue.removeStreamMessageQueue(stream.id); | |
+ print('wctest: _cleanupClosedStream: removing streamId: ${stream.id}'); | |
_openStreams.remove(stream.id); | |
diff --git a/pkgs/http2/lib/transport.dart b/pkgs/http2/lib/transport.dart | |
index 87a10f6..c881c1c 100644 | |
--- a/pkgs/http2/lib/transport.dart | |
+++ b/pkgs/http2/lib/transport.dart | |
@@ -86,2 +86,3 @@ abstract class ClientTransportConnection extends TransportConnection { | |
{ClientSettings? settings}) { | |
+ print('wctest: http2:lib/transport.dart ClientTransportConnection.viaStreams :D'); | |
settings ??= const ClientSettings(); | |
END_OF_PATCH | |
WORKDIR grpc-dart/example/helloworld/ | |
RUN dart pub get | |
RUN cat >./run <<EOF && chmod +x ./run | |
#!/bin/sh | |
printf >&2 'Starting server...\n' | |
/root/grpc-server >/tmp/server.log 2>&1 & | |
pid=$! | |
trap 'kill -9 $pid' EXIT | |
printf >&2 'Waiting a little bit for server...\n' | |
sleep 1 | |
printf >&2 'Starting client...\n' | |
dart run ./bin/client.dart | |
EOF | |
CMD [ "./run" ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
dart-lang/http#1799