Skip to content

Instantly share code, notes, and snippets.

@wchargin
Last active July 25, 2025 03:51
Show Gist options
  • Save wchargin/9934394d50185f62e49188d2268da937 to your computer and use it in GitHub Desktop.
Save wchargin/9934394d50185f62e49188d2268da937 to your computer and use it in GitHub Desktop.
repro for Dart HTTP/2 issue: client RST_STREAM forgets stream ID and destroys connection on incoming server frame
###
# gRPC server, in Go
###
FROM golang:1.24.5-alpine AS server
WORKDIR /root
RUN apk add git
RUN git clone --branch v1.74.2 --depth 1 https://github.com/grpc/grpc-go
RUN git -C grpc-go apply <<'END_OF_PATCH'
diff --git a/examples/helloworld/greeter_server/main.go b/examples/helloworld/greeter_server/main.go
index f7ccf1c..d6bc60d 100644
--- a/examples/helloworld/greeter_server/main.go
+++ b/examples/helloworld/greeter_server/main.go
@@ -25,6 +25,7 @@ import (
"fmt"
"log"
"net"
+ "time"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
@@ -40,9 +41,17 @@ type server struct {
}
// SayHello implements helloworld.GreeterServer
-func (s *server) SayHello(_ context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
- log.Printf("Received: %v", in.GetName())
- return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
+func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
+ name := in.GetName()
+ log.Printf("SayHello(%q): ctx = %v\n", ctx, name)
+ select {
+ case <-ctx.Done():
+ fmt.Printf("context closed! for %q\n", in.GetName())
+ return nil, ctx.Err()
+ case <-time.After(100 * time.Millisecond):
+ fmt.Printf("waited the time. returning the err. for %q\n", in.GetName())
+ return &pb.HelloReply{Message: fmt.Sprintf("Hello, %q!", name)}, nil
+ }
}
func main() {
END_OF_PATCH
RUN go -C grpc-go/examples/helloworld build -o /root/grpc-server ./greeter_server/main.go
###
# gRPC client, in Dart
###
FROM dart:3.8.2
WORKDIR /root
COPY --from=server /root/grpc-server /root/grpc-server
RUN git clone --branch v3.2.2 --depth 1 https://github.com/grpc/grpc-dart
RUN git clone --branch http2-v2.3.1 --depth 1 https://github.com/dart-lang/http /root/dart-http
RUN git -C grpc-dart apply <<'END_OF_PATCH'
diff --git a/example/helloworld/bin/client.dart b/example/helloworld/bin/client.dart
index 4d4635d..87a2e2d 100644
--- a/example/helloworld/bin/client.dart
+++ b/example/helloworld/bin/client.dart
@@ -27,18 +27,32 @@ Future<void> main(List<String> args) async {
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
),
);
+ channel.onConnectionStateChanged.listen((event) {
+ print('changed! $event');
+ });
final stub = GreeterClient(channel);
- final name = args.isNotEmpty ? args[0] : 'world';
+ int callIndex = 1;
+ while (true) {
+ String name = 'short-$callIndex';
+ print("");
+ print("making short call");
+ await call(stub, name);
+ callIndex++;
+ }
+ await channel.shutdown();
+}
+
+Future<void> call(GreeterClient stub, String name) async {
+ CallOptions opts = CallOptions(timeout: const Duration(milliseconds: 100));
try {
final response = await stub.sayHello(
HelloRequest()..name = name,
- options: CallOptions(compression: const GzipCodec()),
+ options: opts,
);
- print('Greeter client received: ${response.message}');
+ print('${name}: Greeter client received: ${response.message}');
} catch (e) {
- print('Caught error: $e');
+ print('${name}: Caught error: $e');
}
- await channel.shutdown();
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 0129f38..4656cb5 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -16,7 +16,8 @@ dependencies:
googleapis_auth: ^1.1.0
meta: ^1.3.0
http: '>=0.13.0 <2.0.0'
- http2: ^2.0.0
+ http2:
+ path: /root/dart-http/pkgs/http2
protobuf: '>=2.0.0 <4.0.0'
dev_dependencies:
END_OF_PATCH
RUN git -C dart-http apply <<'END_OF_PATCH'
diff --git a/pkgs/http2/lib/src/connection.dart b/pkgs/http2/lib/src/connection.dart
index 4e52e57..2b63ecf 100644
--- a/pkgs/http2/lib/src/connection.dart
+++ b/pkgs/http2/lib/src/connection.dart
@@ -164,3 +164,6 @@ abstract class Connection {
_frameReaderSubscription = incomingFrames.listen((Frame frame) {
- _catchProtocolErrors(() => _handleFrameImpl(frame));
+ _catchProtocolErrors(() {
+ print('wctest: processing frame: $frame');
+ return _handleFrameImpl(frame);
+ });
}, onError: (error, stack) {
@@ -302,2 +305,3 @@ abstract class Connection {
} on ProtocolException catch (error) {
+ print('_catchProtocolErrors got PROTOCOL_ERROR: $error');
_terminate(ErrorCode.PROTOCOL_ERROR, message: '$error');
@@ -433,2 +437,4 @@ abstract class Connection {
// Close all streams & stream queues
+ print('wctest: forcefully terminating!!! stack trace: ${StackTrace.current}');
+
_streams.terminate(exception);
diff --git a/pkgs/http2/lib/src/streams/stream_handler.dart b/pkgs/http2/lib/src/streams/stream_handler.dart
index 92a228d..0ebb822 100644
--- a/pkgs/http2/lib/src/streams/stream_handler.dart
+++ b/pkgs/http2/lib/src/streams/stream_handler.dart
@@ -337,2 +337,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
if (stream.state == StreamState.HalfClosedLocal) {
+ print('wctest: streamQueueIn.onCancel.then callback for id=${streamId}');
stream.outgoingQueue
@@ -411,2 +412,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
stream.state == StreamState.ReservedRemote) {
+ print('wctest: _terminateStream: client writing RST_STREAM for id=${stream.id}');
_frameWriter.writeRstStreamFrame(stream.id, ErrorCode.CANCEL);
@@ -480,2 +482,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
} on StreamClosedException catch (exception) {
+ print('wctest: processStreamFrame catch StreamClosedException: client writing RST_STREAM for id=${exception.streamId}');
_frameWriter.writeRstStreamFrame(
@@ -484,2 +487,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
} on StreamException catch (exception) {
+ print('wctest: processStreamFrame catch StreamException: client writing RST_STREAM for id=${exception.streamId}');
_frameWriter.writeRstStreamFrame(
@@ -520,2 +524,7 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
var stream = _openStreams[frame.header.streamId];
+
+ if (frame is HeadersFrame) {
+ print('wctest: http2...stream_handler: _processStreamFrameInternal: frame is HeadersFrame; streamId=${frame.header.streamId}; stream=$stream');
+ }
+
if (stream == null) {
@@ -547,2 +556,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
} else {
+ print('wctest: complaining about headers frame: stack: ${StackTrace.current}');
// A server cannot open new streams to the client. The only way
@@ -696,2 +706,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
// (e.g. MessageQueues which are not empty yet).
+ print('wctest: _handleEndOfStreamRemote: removing streamId: ${stream.id}');
_openStreams.remove(stream.id);
@@ -716,2 +727,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
}
+ print('wctest: stream_handler.dart: _sendHeaders: id=${stream.id}; headers=$headers; endStream=$endStream');
@@ -745,2 +757,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
void _endStream(Http2StreamImpl stream) {
+ print('wctest: stream_handler: _endStream: id=${stream.id}');
if (stream.state == StreamState.Open) {
@@ -763,2 +776,3 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
incomingQueue.removeStreamMessageQueue(stream.id);
+ print('wctest: _cleanupClosedStream: removing streamId: ${stream.id}');
_openStreams.remove(stream.id);
diff --git a/pkgs/http2/lib/transport.dart b/pkgs/http2/lib/transport.dart
index 87a10f6..c881c1c 100644
--- a/pkgs/http2/lib/transport.dart
+++ b/pkgs/http2/lib/transport.dart
@@ -86,2 +86,3 @@ abstract class ClientTransportConnection extends TransportConnection {
{ClientSettings? settings}) {
+ print('wctest: http2:lib/transport.dart ClientTransportConnection.viaStreams :D');
settings ??= const ClientSettings();
END_OF_PATCH
WORKDIR grpc-dart/example/helloworld/
RUN dart pub get
RUN cat >./run <<EOF && chmod +x ./run
#!/bin/sh
printf >&2 'Starting server...\n'
/root/grpc-server >/tmp/server.log 2>&1 &
pid=$!
trap 'kill -9 $pid' EXIT
printf >&2 'Waiting a little bit for server...\n'
sleep 1
printf >&2 'Starting client...\n'
dart run ./bin/client.dart
EOF
CMD [ "./run" ]
@wchargin
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment