-
-
Save abatilo/1755c0ec2d26c630c522fcb3455794b1 to your computer and use it in GitHub Desktop.
Example of graceful shutdown with grpc healthserver * httpserver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
readinessProbe: | |
exec: | |
command: ["/root/grpc_health_probe", "-addr=:6666"] | |
initialDelaySeconds: 1 | |
livenessProbe: | |
exec: | |
command: ["/root/grpc_health_probe", "-addr=:6666"] | |
initialDelaySeconds: 2 | |
imagePullPolicy: IfNotPresent |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
stdlog "log" | |
"net" | |
"net/http" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/go-kit/kit/log/level" | |
"github.com/namsral/flag" | |
"golang.org/x/sync/errgroup" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/health" | |
healthpb "google.golang.org/grpc/health/grpc_health_v1" | |
"google.golang.org/grpc/keepalive" | |
) | |
const serviceName = "myserviced" | |
var ( | |
version = "no version" | |
httpPort = flag.Int("httpPort", 8888, "http port") | |
grpcPort = flag.Int("grpcPort", 9200, "grpc port") | |
healthPort = flag.Int("healthPort", 6666, "grpc health port") | |
logLevel = flag.String("logLevel", "INFO", "Log level, INFO|WARNING|DEBUG|ERROR") | |
gcpProjectID = flag.String("gcpProjectID", "none", "GCP Project ID") | |
dev = flag.Bool("dev", false, "Enable development tooling") | |
grpcServer *grpc.Server | |
grpcHealthServer *grpc.Server | |
httpServer *http.Server | |
) | |
func main() { | |
flag.Parse() | |
logger, err := probe.InitLogger(serviceName, *logLevel) | |
if err != nil { | |
stdlog.Fatal("can't init logger", err) | |
} | |
level.Info(logger).Log("msg", "Starting app", "version", version) | |
ctx := context.Background() | |
ctx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
interrupt := make(chan os.Signal, 1) | |
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) | |
defer signal.Stop(interrupt) | |
g, ctx := errgroup.WithContext(ctx) | |
// web server metrics | |
g.Go(func() error { | |
httpServer = &http.Server{ | |
Addr: fmt.Sprintf(":%d", *httpPort), | |
ReadTimeout: 10 * time.Second, | |
WriteTimeout: 10 * time.Second, | |
} | |
level.Info(logger).Log("msg", fmt.Sprintf("HTTP Metrics server serving at :%d", *httpPort)) | |
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed { | |
return err | |
} | |
return nil | |
}) | |
// gRPC Health Server | |
healthServer := health.NewServer() | |
g.Go(func() error { | |
grpcHealthServer = grpc.NewServer() | |
healthpb.RegisterHealthServer(grpcHealthServer, healthServer) | |
haddr := fmt.Sprintf(":%d", *healthPort) | |
hln, err := net.Listen("tcp", haddr) | |
if err != nil { | |
level.Error(logger).Log("msg", "gRPC Health server: failed to listen", "error", err) | |
os.Exit(2) | |
} | |
level.Info(logger).Log("msg", fmt.Sprintf("gRPC health server serving at %s", haddr)) | |
return grpcHealthServer.Serve(hln) | |
}) | |
// gRPC server | |
g.Go(func() error { | |
addr := fmt.Sprintf(":%d", *grpcPort) | |
ln, err := net.Listen("tcp", addr) | |
if err != nil { | |
level.Error(logger).Log("msg", "gRPC server: failed to listen", "error", err) | |
os.Exit(2) | |
} | |
server := &myservice.Server{ | |
Logger: logger, | |
Health: healthServer, | |
ServiceName: serviceName, | |
} | |
grpcServer = grpc.NewServer( | |
// MaxConnectionAge is just to avoid long connection, to facilitate load balancing | |
// MaxConnectionAgeGrace will torn them, default to infinity | |
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: 2 * time.Minute}), | |
grpc.StatsHandler(&ocgrpc_propag.ServerHandler{}), | |
) | |
myservice.RegisterNyServiceServer(grpcServer, server) | |
level.Info(logger).Log("msg", fmt.Sprintf("gRPC server serving at %s", addr)) | |
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_SERVING) | |
return grpcServer.Serve(ln) | |
}) | |
select { | |
case <-interrupt: | |
break | |
case <-ctx.Done(): | |
break | |
} | |
level.Warn(logger).Log("msg", "received shutdown signal") | |
cancel() | |
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_NOT_SERVING) | |
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) | |
defer shutdownCancel() | |
if httpServer != nil { | |
_ = httpServer.Shutdown(shutdownCtx) | |
} | |
if grpcServer != nil { | |
grpcServer.GracefulStop() | |
} | |
if grpcHealthServer != nil { | |
grpcHealthServer.GracefulStop() | |
} | |
err = g.Wait() | |
if err != nil { | |
level.Error(logger).Log("msg", "server returning an error", "error", err) | |
os.Exit(2) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment