Skip to content

Instantly share code, notes, and snippets.

@manhdaovan
Last active June 13, 2019 02:25
Show Gist options
  • Save manhdaovan/853a8acaa203c0766051ba2fd9ba4fcc to your computer and use it in GitHub Desktop.
Save manhdaovan/853a8acaa203c0766051ba2fd9ba4fcc to your computer and use it in GitHub Desktop.
Example about GRPC cancellation: client calls context.cancel(), then the context in server side is canceled, too.
syntax = "proto3";
// EchoRequest is the request for echo.
message EchoRequest {
string message = 1;
}
// EchoResponse is the response for echo.
message EchoResponse {
string message = 1;
}
service NonStreamingCancel {
rpc Echo(EchoRequest) returns(EchoResponse) {}
}
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"google.golang.org/grpc"
pb "grpc/examples/proto/echo"
)
var addr = flag.String("addr", "localhost:50051", "the address to connect to")
func main() {
flag.Parse()
// Set up a connection to the server.
conn, err := grpc.Dial(*addr, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewNonStreamingCancelClient(conn)
// Initiate the stream with a context that supports cancellation.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
go func() {
fmt.Println("cancelling context")
time.Sleep(1 * time.Second)
cancel()
}()
res, err := c.Echo(ctx, &pb.EchoRequest{Message: "echo from client"})
if err != nil {
log.Fatalf("error on Echo: %v", err)
}
fmt.Printf("response ------- %+v\n", res)
}
package main
import (
"context"
"flag"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"net"
"time"
"google.golang.org/grpc"
pb "grpc/examples/proto/echo"
)
var port = flag.Int("port", 50051, "the port to serve on")
type server struct{}
func (s *server) Echo(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
fmt.Println("msg from client: ", in.Message)
done := make(chan struct{})
go func() {
fmt.Println("start sleep")
time.Sleep(5 * time.Second)
done <- struct{}{}
fmt.Println("push to done channel, end sleep")
}()
select {
case <- ctx.Done():
fmt.Println("context done, return error")
return nil, status.Error(codes.Canceled, "context canceled on server side")
case <- done:
fmt.Println("get data from done channel")
return &pb.EchoResponse{Message: "echo from server"}, nil
}
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
fmt.Printf("server listening at port %v\n", lis.Addr())
s := grpc.NewServer()
pb.RegisterNonStreamingCancelServer(s, &server{})
s.Serve(lis)
}

Log of client side

cancelling context
2019/06/12 11:33:01 error on Echo: rpc error: code = Canceled desc = context canceled
exit status 1

Log of server side

msg from client:  echo from client
start sleep
context done, return error

So what?

Base on log content, we can conclude that when the context in the client side is canceled, the context in server side is canceled, too.

But how?

In short, the flow is like below:

  • Both the client and server have a different context, and the client listens to its own context.
  • When the context of the client is canceled by calling cancel(), the client sends an error with Canceled code to the server.
  • The server receives this error code, then call cancel() on the context of the server.
    • And if the server listens to its own context, you can stop the current action as the code above.

You can dig deeper into the source code by starting at https://github.com/grpc/grpc-go/blob/684ef046099f3f1c4b1fe762e5826a2f7bc6e27f/internal/transport/http2_client.go#L975

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment