Created
February 8, 2020 21:43
-
-
Save olegch/b08a59406792441330b938ed11c79580 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"strconv" | |
"strings" | |
"github.com/pkg/errors" | |
v1 "k8s.io/api/core/v1" | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/apimachinery/pkg/util/httpstream" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/rest" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/client-go/tools/portforward" | |
"k8s.io/client-go/transport/spdy" | |
) | |
func main() { | |
fmt.Println("K8S forwarder started") | |
// load config | |
configBytes, err := ioutil.ReadFile("/home/olegch/.kube/config") | |
fmt.Println(err) | |
if err != nil { | |
return | |
} | |
config, err := clientcmd.NewClientConfigFromBytes(configBytes) | |
fmt.Println(config, err) | |
if err != nil { | |
return | |
} | |
// create client set | |
clientConfig, err := config.ClientConfig() | |
fmt.Println(clientConfig, err) | |
if err != nil { | |
return | |
} | |
clientset, err := kubernetes.NewForConfig(clientConfig) | |
fmt.Println(clientset, err) | |
if err != nil { | |
return | |
} | |
// list pods | |
pl, err := clientset.CoreV1().Pods("default").List(metav1.ListOptions{}) | |
fmt.Println(pl, err) | |
if err != nil { | |
return | |
} | |
// get dialer | |
dl, err := dialer(clientConfig, clientset, pl.Items[0].ObjectMeta.Namespace, pl.Items[0].ObjectMeta.Name) | |
fmt.Println(dl, err) | |
if err != nil { | |
return | |
} | |
// upgrade connection | |
streamConn, str, err := dl.Dial(portforward.PortForwardProtocolV1Name) | |
fmt.Println(streamConn, str, err) | |
if err != nil { | |
return | |
} | |
defer streamConn.Close() | |
// wait for interrupt or conn closure | |
// select { | |
// case <-streamConn.CloseChan(): | |
// handle error | |
// } | |
requestID := 0 | |
requestID++ | |
// create error stream | |
headers := http.Header{} | |
headers.Set(v1.StreamType, v1.StreamTypeError) | |
headers.Set(v1.PortHeader, fmt.Sprintf("%d", 80)) | |
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID)) | |
errorStream, err := streamConn.CreateStream(headers) | |
fmt.Println(errorStream, err) | |
if err != nil { | |
return | |
} | |
// we're not writing to this stream | |
errorStream.Close() | |
// read from error stream | |
errorChan := make(chan error) | |
go func() { | |
message, err := ioutil.ReadAll(errorStream) | |
switch { | |
case err != nil: | |
errorChan <- fmt.Errorf("error reading from error stream: %v", err) | |
case len(message) > 0: | |
errorChan <- fmt.Errorf("an error occurred forwarding: %v", string(message)) | |
} | |
close(errorChan) | |
}() | |
// create data stream | |
headers.Set(v1.StreamType, v1.StreamTypeData) | |
dataStream, err := streamConn.CreateStream(headers) | |
fmt.Println(dataStream, err) | |
if err != nil { | |
return | |
} | |
localError := make(chan struct{}) | |
remoteDone := make(chan struct{}) | |
go func() { | |
// Copy from the remote side to the local port. | |
_, err := io.Copy(os.Stdout, dataStream) | |
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { | |
//runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) | |
fmt.Println(err) | |
} | |
// inform the select below that the remote copy is done | |
close(remoteDone) | |
}() | |
go func() { | |
// inform server we're not sending any more data after copy unblocks | |
defer dataStream.Close() | |
// Copy from the local port to the remote side. | |
_, err := io.Copy(dataStream, bytes.NewBufferString("GET /\n")) | |
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { | |
//runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) | |
fmt.Println(err) | |
// break out of the select below without waiting for the other copy to finish | |
close(localError) | |
} | |
}() | |
// wait for either a local->remote error or for copying from remote->local to finish | |
select { | |
case <-remoteDone: | |
// remote side finished sending | |
case <-localError: | |
// local error while sending data to remote | |
case <-streamConn.CloseChan(): | |
// forwarding connection closed | |
} | |
// always expect something on errorChan (it may be nil) | |
err = <-errorChan | |
if err != nil { | |
fmt.Println(err) | |
} | |
} | |
// Create an httpstream.Dialer for use with portforward.New | |
func dialer(config *rest.Config, clientset *kubernetes.Clientset, ns, pod string) (httpstream.Dialer, error) { | |
url := clientset.CoreV1().RESTClient().Post(). | |
Resource("pods"). | |
Namespace(ns). | |
Name(pod). | |
SubResource("portforward").URL() | |
fmt.Println("URL:", url) | |
transport, upgrader, err := spdy.RoundTripperFor(config) | |
if err != nil { | |
return nil, errors.Wrap(err, "Could not create round tripper") | |
} | |
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) | |
return dialer, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment