Created
February 17, 2021 03:56
-
-
Save PaulFurtado/763700624dfcb75fdcccb1a305794fb5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go | |
index c24fbc6921c..e499f2a18e0 100644 | |
--- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go | |
+++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go | |
@@ -30,6 +30,7 @@ import ( | |
"path" | |
"strconv" | |
"strings" | |
+ "time" | |
"golang.org/x/net/http2" | |
"k8s.io/klog" | |
@@ -121,13 +122,61 @@ func SetTransportDefaults(t *http.Transport) *http.Transport { | |
if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 { | |
klog.Infof("HTTP2 has been explicitly disabled") | |
} else if allowsHTTP2(t) { | |
- if err := http2.ConfigureTransport(t); err != nil { | |
+ if err := configureHTTP2Transport(t); err != nil { | |
klog.Warningf("Transport failed http2 configuration: %v", err) | |
} | |
} | |
return t | |
} | |
+func readIdleTimeoutSeconds() int { | |
+ ret := 30 | |
+ // User can set the readIdleTimeout to 0 to disable the HTTP/2 | |
+ // connection health check. | |
+ if s := os.Getenv("HTTP2_READ_IDLE_TIMEOUT_SECONDS"); len(s) > 0 { | |
+ i, err := strconv.Atoi(s) | |
+ if err != nil { | |
+ klog.Warningf("Illegal HTTP2_READ_IDLE_TIMEOUT_SECONDS(%q): %v."+ | |
+ " Default value %d is used", s, err, ret) | |
+ return ret | |
+ } | |
+ ret = i | |
+ } | |
+ return ret | |
+} | |
+ | |
+func pingTimeoutSeconds() int { | |
+ ret := 15 | |
+ if s := os.Getenv("HTTP2_PING_TIMEOUT_SECONDS"); len(s) > 0 { | |
+ i, err := strconv.Atoi(s) | |
+ if err != nil { | |
+ klog.Warningf("Illegal HTTP2_PING_TIMEOUT_SECONDS(%q): %v."+ | |
+ " Default value %d is used", s, err, ret) | |
+ return ret | |
+ } | |
+ ret = i | |
+ } | |
+ return ret | |
+} | |
+ | |
+func configureHTTP2Transport(t *http.Transport) error { | |
+ t2, err := http2.ConfigureTransports(t) | |
+ if err != nil { | |
+ return err | |
+ } | |
+ // The following enables the HTTP/2 connection health check added in | |
+ // https://github.com/golang/net/pull/55. The health check detects and | |
+ // closes broken transport layer connections. Without the health check, | |
+ // a broken connection can linger too long, e.g., a broken TCP | |
+ // connection will be closed by the Linux kernel after 13 to 30 minutes | |
+ // by default, which caused | |
+ // https://github.com/kubernetes/client-go/issues/374 and | |
+ // https://github.com/kubernetes/kubernetes/issues/87615. | |
+ t2.ReadIdleTimeout = time.Duration(readIdleTimeoutSeconds()) * time.Second | |
+ t2.PingTimeout = time.Duration(pingTimeoutSeconds()) * time.Second | |
+ return nil | |
+} | |
+ | |
func allowsHTTP2(t *http.Transport) bool { | |
if t.TLSClientConfig == nil || len(t.TLSClientConfig.NextProtos) == 0 { | |
// the transport expressed no NextProto preference, allow | |
diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go | |
index 142b80f1a84..c057b99530e 100644 | |
--- a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go | |
+++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go | |
@@ -492,3 +492,39 @@ func TestAllowsHTTP2(t *testing.T) { | |
}) | |
} | |
} | |
+ | |
+func setEnv(key, value string) func() { | |
+ originalValue := os.Getenv(key) | |
+ os.Setenv(key, value) | |
+ return func() { | |
+ os.Setenv(key, originalValue) | |
+ } | |
+} | |
+ | |
+func TestReadIdleTimeoutSeconds(t *testing.T) { | |
+ reset := setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", "60") | |
+ if e, a := 60, readIdleTimeoutSeconds(); e != a { | |
+ t.Errorf("expected %d, got %d", e, a) | |
+ } | |
+ reset() | |
+ | |
+ reset = setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", "illegal value") | |
+ if e, a := 30, readIdleTimeoutSeconds(); e != a { | |
+ t.Errorf("expected %d, got %d", e, a) | |
+ } | |
+ reset() | |
+} | |
+ | |
+func TestPingTimeoutSeconds(t *testing.T) { | |
+ reset := setEnv("HTTP2_PING_TIMEOUT_SECONDS", "60") | |
+ if e, a := 60, pingTimeoutSeconds(); e != a { | |
+ t.Errorf("expected %d, got %d", e, a) | |
+ } | |
+ reset() | |
+ | |
+ reset = setEnv("HTTP2_PING_TIMEOUT_SECONDS", "illegal value") | |
+ if e, a := 15, pingTimeoutSeconds(); e != a { | |
+ t.Errorf("expected %d, got %d", e, a) | |
+ } | |
+ reset() | |
+} | |
diff --git a/staging/src/k8s.io/client-go/rest/BUILD b/staging/src/k8s.io/client-go/rest/BUILD | |
index 61f77e28eca..1f37ef5867d 100644 | |
--- a/staging/src/k8s.io/client-go/rest/BUILD | |
+++ b/staging/src/k8s.io/client-go/rest/BUILD | |
@@ -11,6 +11,7 @@ go_test( | |
srcs = [ | |
"client_test.go", | |
"config_test.go", | |
+ "connection_test.go", | |
"plugin_test.go", | |
"request_test.go", | |
"url_utils_test.go", | |
@@ -25,12 +26,14 @@ go_test( | |
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", | |
+ "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", | |
+ "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", | |
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", | |
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", | |
"//staging/src/k8s.io/client-go/rest/watch:go_default_library", | |
diff --git a/staging/src/k8s.io/client-go/rest/connection_test.go b/staging/src/k8s.io/client-go/rest/connection_test.go | |
new file mode 100644 | |
index 00000000000..f6695b8cb14 | |
--- /dev/null | |
+++ b/staging/src/k8s.io/client-go/rest/connection_test.go | |
@@ -0,0 +1,161 @@ | |
+/* | |
+Copyright 2019 The Kubernetes Authors. | |
+Licensed under the Apache License, Version 2.0 (the "License"); | |
+you may not use this file except in compliance with the License. | |
+You may obtain a copy of the License at | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+Unless required by applicable law or agreed to in writing, software | |
+distributed under the License is distributed on an "AS IS" BASIS, | |
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+See the License for the specific language governing permissions and | |
+limitations under the License. | |
+*/ | |
+ | |
+package rest | |
+ | |
+import ( | |
+ "context" | |
+ "fmt" | |
+ "io" | |
+ "net" | |
+ "net/http" | |
+ "net/http/httptest" | |
+ "net/url" | |
+ "os" | |
+ "strconv" | |
+ "sync/atomic" | |
+ "testing" | |
+ "time" | |
+ | |
+ "k8s.io/apimachinery/pkg/runtime/schema" | |
+ "k8s.io/apimachinery/pkg/runtime/serializer" | |
+ utilnet "k8s.io/apimachinery/pkg/util/net" | |
+) | |
+ | |
+type tcpLB struct { | |
+ t *testing.T | |
+ ln net.Listener | |
+ serverURL string | |
+ dials int32 | |
+} | |
+ | |
+func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) { | |
+ out, err := net.Dial("tcp", lb.serverURL) | |
+ if err != nil { | |
+ lb.t.Log(err) | |
+ return | |
+ } | |
+ go io.Copy(out, in) | |
+ go io.Copy(in, out) | |
+ <-stopCh | |
+ if err := out.Close(); err != nil { | |
+ lb.t.Fatalf("failed to close connection: %v", err) | |
+ } | |
+} | |
+ | |
+func (lb *tcpLB) serve(stopCh chan struct{}) { | |
+ conn, err := lb.ln.Accept() | |
+ if err != nil { | |
+ lb.t.Fatalf("failed to accept: %v", err) | |
+ } | |
+ atomic.AddInt32(&lb.dials, 1) | |
+ go lb.handleConnection(conn, stopCh) | |
+} | |
+ | |
+func newLB(t *testing.T, serverURL string) *tcpLB { | |
+ ln, err := net.Listen("tcp", "127.0.0.1:0") | |
+ if err != nil { | |
+ t.Fatalf("failed to bind: %v", err) | |
+ } | |
+ lb := tcpLB{ | |
+ serverURL: serverURL, | |
+ ln: ln, | |
+ t: t, | |
+ } | |
+ return &lb | |
+} | |
+ | |
+func setEnv(key, value string) func() { | |
+ originalValue := os.Getenv(key) | |
+ os.Setenv(key, value) | |
+ return func() { | |
+ os.Setenv(key, originalValue) | |
+ } | |
+} | |
+ | |
+const ( | |
+ readIdleTimeout int = 1 | |
+ pingTimeout int = 1 | |
+) | |
+ | |
+func TestReconnectBrokenTCP(t *testing.T) { | |
+ defer setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", strconv.Itoa(readIdleTimeout))() | |
+ defer setEnv("HTTP2_PING_TIMEOUT_SECONDS", strconv.Itoa(pingTimeout))() | |
+ defer setEnv("DISABLE_HTTP2", "")() | |
+ ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
+ fmt.Fprintf(w, "Hello, %s", r.Proto) | |
+ })) | |
+ ts.EnableHTTP2 = true | |
+ ts.StartTLS() | |
+ defer ts.Close() | |
+ | |
+ u, err := url.Parse(ts.URL) | |
+ if err != nil { | |
+ t.Fatalf("failed to parse URL from %q: %v", ts.URL, err) | |
+ } | |
+ lb := newLB(t, u.Host) | |
+ defer lb.ln.Close() | |
+ stopCh := make(chan struct{}) | |
+ go lb.serve(stopCh) | |
+ transport, ok := ts.Client().Transport.(*http.Transport) | |
+ if !ok { | |
+ t.Fatalf("failed to assert *http.Transport") | |
+ } | |
+ config := &Config{ | |
+ Host: "https://" + lb.ln.Addr().String(), | |
+ Transport: utilnet.SetTransportDefaults(transport), | |
+ Timeout: 1 * time.Second, | |
+ // These fields are required to create a REST client. | |
+ ContentConfig: ContentConfig{ | |
+ GroupVersion: &schema.GroupVersion{}, | |
+ NegotiatedSerializer: &serializer.CodecFactory{}, | |
+ }, | |
+ } | |
+ client, err := RESTClientFor(config) | |
+ if err != nil { | |
+ t.Fatalf("failed to create REST client: %v", err) | |
+ } | |
+ data, err := client.Get().AbsPath("/").DoRaw(context.TODO()) | |
+ if err != nil { | |
+ t.Fatalf("unexpected err: %s: %v", data, err) | |
+ } | |
+ if string(data) != "Hello, HTTP/2.0" { | |
+ t.Fatalf("unexpected response: %s", data) | |
+ } | |
+ | |
+ // Deliberately let the LB stop proxying traffic for the current | |
+ // connection. This mimics a broken TCP connection that's not properly | |
+ // closed. | |
+ close(stopCh) | |
+ | |
+ stopCh = make(chan struct{}) | |
+ go lb.serve(stopCh) | |
+ // Sleep enough time for the HTTP/2 health check to detect and close | |
+ // the broken TCP connection. | |
+ time.Sleep(time.Duration(1+readIdleTimeout+pingTimeout) * time.Second) | |
+ // If the HTTP/2 health check were disabled, the broken connection | |
+ // would still be in the connection pool, the following request would | |
+ // then reuse the broken connection instead of creating a new one, and | |
+ // thus would fail. | |
+ data, err = client.Get().AbsPath("/").DoRaw(context.TODO()) | |
+ if err != nil { | |
+ t.Fatalf("unexpected err: %v", err) | |
+ } | |
+ if string(data) != "Hello, HTTP/2.0" { | |
+ t.Fatalf("unexpected response: %s", data) | |
+ } | |
+ dials := atomic.LoadInt32(&lb.dials) | |
+ if dials != 2 { | |
+ t.Fatalf("expected %d dials, got %d", 2, dials) | |
+ } | |
+} | |
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go | |
index aeac7d8a51a..d0a35d53fd4 100644 | |
--- a/vendor/golang.org/x/net/http2/transport.go | |
+++ b/vendor/golang.org/x/net/http2/transport.go | |
@@ -108,6 +108,19 @@ type Transport struct { | |
// waiting for their turn. | |
StrictMaxConcurrentStreams bool | |
+ // ReadIdleTimeout is the timeout after which a health check using ping | |
+ // frame will be carried out if no frame is received on the connection. | |
+ // Note that a ping response will is considered a received frame, so if | |
+ // there is no other traffic on the connection, the health check will | |
+ // be performed every ReadIdleTimeout interval. | |
+ // If zero, no health check is performed. | |
+ ReadIdleTimeout time.Duration | |
+ | |
+ // PingTimeout is the timeout after which the connection will be closed | |
+ // if a response to Ping is not received. | |
+ // Defaults to 15s. | |
+ PingTimeout time.Duration | |
+ | |
// t1, if non-nil, is the standard library Transport using | |
// this transport. Its settings are used (but not its | |
// RoundTrip method, etc). | |
@@ -131,14 +144,31 @@ func (t *Transport) disableCompression() bool { | |
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression) | |
} | |
+func (t *Transport) pingTimeout() time.Duration { | |
+ if t.PingTimeout == 0 { | |
+ return 15 * time.Second | |
+ } | |
+ return t.PingTimeout | |
+ | |
+} | |
+ | |
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2. | |
// It returns an error if t1 has already been HTTP/2-enabled. | |
+// | |
+// Use ConfigureTransports instead to configure the HTTP/2 Transport. | |
func ConfigureTransport(t1 *http.Transport) error { | |
- _, err := configureTransport(t1) | |
+ _, err := ConfigureTransports(t1) | |
return err | |
} | |
-func configureTransport(t1 *http.Transport) (*Transport, error) { | |
+// ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2. | |
+// It returns a new HTTP/2 Transport for further configuration. | |
+// It returns an error if t1 has already been HTTP/2-enabled. | |
+func ConfigureTransports(t1 *http.Transport) (*Transport, error) { | |
+ return configureTransports(t1) | |
+} | |
+ | |
+func configureTransports(t1 *http.Transport) (*Transport, error) { | |
connPool := new(clientConnPool) | |
t2 := &Transport{ | |
ConnPool: noDialClientConnPool{connPool}, | |
@@ -667,6 +697,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro | |
cc.inflow.add(transportDefaultConnFlow + initialWindowSize) | |
cc.bw.Flush() | |
if cc.werr != nil { | |
+ cc.Close() | |
return nil, cc.werr | |
} | |
@@ -674,6 +705,20 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro | |
return cc, nil | |
} | |
+func (cc *ClientConn) healthCheck() { | |
+ pingTimeout := cc.t.pingTimeout() | |
+ // We don't need to periodically ping in the health check, because the readLoop of ClientConn will | |
+ // trigger the healthCheck again if there is no frame received. | |
+ ctx, cancel := context.WithTimeout(context.Background(), pingTimeout) | |
+ defer cancel() | |
+ err := cc.Ping(ctx) | |
+ if err != nil { | |
+ cc.closeForLostPing() | |
+ cc.t.connPool().MarkDead(cc) | |
+ return | |
+ } | |
+} | |
+ | |
func (cc *ClientConn) setGoAway(f *GoAwayFrame) { | |
cc.mu.Lock() | |
defer cc.mu.Unlock() | |
@@ -834,14 +879,12 @@ func (cc *ClientConn) sendGoAway() error { | |
return nil | |
} | |
-// Close closes the client connection immediately. | |
-// | |
-// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead. | |
-func (cc *ClientConn) Close() error { | |
+// closes the client connection immediately. In-flight requests are interrupted. | |
+// err is sent to streams. | |
+func (cc *ClientConn) closeForError(err error) error { | |
cc.mu.Lock() | |
defer cc.cond.Broadcast() | |
defer cc.mu.Unlock() | |
- err := errors.New("http2: client connection force closed via ClientConn.Close") | |
for id, cs := range cc.streams { | |
select { | |
case cs.resc <- resAndError{err: err}: | |
@@ -854,6 +897,20 @@ func (cc *ClientConn) Close() error { | |
return cc.tconn.Close() | |
} | |
+// Close closes the client connection immediately. | |
+// | |
+// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead. | |
+func (cc *ClientConn) Close() error { | |
+ err := errors.New("http2: client connection force closed via ClientConn.Close") | |
+ return cc.closeForError(err) | |
+} | |
+ | |
+// closes the client connection immediately. In-flight requests are interrupted. | |
+func (cc *ClientConn) closeForLostPing() error { | |
+ err := errors.New("http2: client connection lost") | |
+ return cc.closeForError(err) | |
+} | |
+ | |
const maxAllocFrameSize = 512 << 10 | |
// frameBuffer returns a scratch buffer suitable for writing DATA frames. | |
@@ -1021,6 +1078,15 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf | |
bodyWriter := cc.t.getBodyWriterState(cs, body) | |
cs.on100 = bodyWriter.on100 | |
+ defer func() { | |
+ cc.wmu.Lock() | |
+ werr := cc.werr | |
+ cc.wmu.Unlock() | |
+ if werr != nil { | |
+ cc.Close() | |
+ } | |
+ }() | |
+ | |
cc.wmu.Lock() | |
endStream := !hasBody && !hasTrailers | |
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) | |
@@ -1682,8 +1748,17 @@ func (rl *clientConnReadLoop) run() error { | |
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse | |
gotReply := false // ever saw a HEADERS reply | |
gotSettings := false | |
+ readIdleTimeout := cc.t.ReadIdleTimeout | |
+ var t *time.Timer | |
+ if readIdleTimeout != 0 { | |
+ t = time.AfterFunc(readIdleTimeout, cc.healthCheck) | |
+ defer t.Stop() | |
+ } | |
for { | |
f, err := cc.fr.ReadFrame() | |
+ if t != nil { | |
+ t.Reset(readIdleTimeout) | |
+ } | |
if err != nil { | |
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment