Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save olegch/b08a59406792441330b938ed11c79580 to your computer and use it in GitHub Desktop.
Save olegch/b08a59406792441330b938ed11c79580 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
func main() {
fmt.Println("K8S forwarder started")
// load config
configBytes, err := ioutil.ReadFile("/home/olegch/.kube/config")
fmt.Println(err)
if err != nil {
return
}
config, err := clientcmd.NewClientConfigFromBytes(configBytes)
fmt.Println(config, err)
if err != nil {
return
}
// create client set
clientConfig, err := config.ClientConfig()
fmt.Println(clientConfig, err)
if err != nil {
return
}
clientset, err := kubernetes.NewForConfig(clientConfig)
fmt.Println(clientset, err)
if err != nil {
return
}
// list pods
pl, err := clientset.CoreV1().Pods("default").List(metav1.ListOptions{})
fmt.Println(pl, err)
if err != nil {
return
}
// get dialer
dl, err := dialer(clientConfig, clientset, pl.Items[0].ObjectMeta.Namespace, pl.Items[0].ObjectMeta.Name)
fmt.Println(dl, err)
if err != nil {
return
}
// upgrade connection
streamConn, str, err := dl.Dial(portforward.PortForwardProtocolV1Name)
fmt.Println(streamConn, str, err)
if err != nil {
return
}
defer streamConn.Close()
// wait for interrupt or conn closure
// select {
// case <-streamConn.CloseChan():
// handle error
// }
requestID := 0
requestID++
// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", 80))
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
errorStream, err := streamConn.CreateStream(headers)
fmt.Println(errorStream, err)
if err != nil {
return
}
// we're not writing to this stream
errorStream.Close()
// read from error stream
errorChan := make(chan error)
go func() {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
errorChan <- fmt.Errorf("error reading from error stream: %v", err)
case len(message) > 0:
errorChan <- fmt.Errorf("an error occurred forwarding: %v", string(message))
}
close(errorChan)
}()
// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := streamConn.CreateStream(headers)
fmt.Println(dataStream, err)
if err != nil {
return
}
localError := make(chan struct{})
remoteDone := make(chan struct{})
go func() {
// Copy from the remote side to the local port.
_, err := io.Copy(os.Stdout, dataStream)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
//runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
fmt.Println(err)
}
// inform the select below that the remote copy is done
close(remoteDone)
}()
go func() {
// inform server we're not sending any more data after copy unblocks
defer dataStream.Close()
// Copy from the local port to the remote side.
_, err := io.Copy(dataStream, bytes.NewBufferString("GET /\n"))
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
//runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
fmt.Println(err)
// break out of the select below without waiting for the other copy to finish
close(localError)
}
}()
// wait for either a local->remote error or for copying from remote->local to finish
select {
case <-remoteDone:
// remote side finished sending
case <-localError:
// local error while sending data to remote
case <-streamConn.CloseChan():
// forwarding connection closed
}
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
fmt.Println(err)
}
}
// Create an httpstream.Dialer for use with portforward.New
func dialer(config *rest.Config, clientset *kubernetes.Clientset, ns, pod string) (httpstream.Dialer, error) {
url := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(ns).
Name(pod).
SubResource("portforward").URL()
fmt.Println("URL:", url)
transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, errors.Wrap(err, "Could not create round tripper")
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
return dialer, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment