Skip to content

Instantly share code, notes, and snippets.

@craftyc0der
Last active August 21, 2021 15:47
Show Gist options
  • Save craftyc0der/9c45d018976b6f1ff11c7fff2fcb1864 to your computer and use it in GitHub Desktop.
Save craftyc0der/9c45d018976b6f1ff11c7fff2fcb1864 to your computer and use it in GitHub Desktop.
Kubernetes Controller Demo

Kubernetes Controller Demo

The purpose of this gist is to share an example of how you can watch the pod lifecycle from within (or outside of) a kubernetes cluster. This allows me to launch a Service whenever a pod is created and delete the service whenever the pod is deleted.

My specific use case is creating an Ambassador Emissary Ingress Service in response to the creation of an Agones GameServer. I have need to host web games that benefit from using edge load balancers that off load TLS encryption.

When I allocate the GameServer, I add specific label which I then search for with this code to create the Service.

Build Controller

go mod download github.com/imdario/mergo
go get k8s.io/client-go/[email protected]
go get informer
go test
go build

Run controller locally

./informer --kubeconfig ~/.kube/config

Expected Output When Pod Is Created

I0817 06:40:38.932324 1488737 informer.go:166] SERVICE CREATED: game-def456
I0817 06:40:38.932356 1488737 informer.go:64] POD CREATED: default/syou-game-server-fnt8w/def456
...
I0817 06:40:38.932356 1488737 informer.go:64] POD DELETED: default/syou-game-server-fnt8w/def456
I0817 06:40:38.932324 1488737 informer.go:166] SERVICE DELETED: game-def456
module informer
go 1.16
require (
github.com/imdario/mergo v0.3.9 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
k8s.io/api v0.18.3
k8s.io/client-go v0.18.3
k8s.io/klog/v2 v2.1.0
k8s.io/kubectl v0.18.3
k8s.io/apimachinery v0.18.3
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451 // indirect
)
package main
import (
"context"
"flag"
"fmt"
"strings"
"time"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
klog "k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubectl/pkg/util/logs"
)
// PodLoggingController logs the name and namespace of pods that are added,
// deleted, or updated
type PodLoggingController struct {
informerFactory informers.SharedInformerFactory
podInformer coreinformers.PodInformer
}
// define the label we are looking for
// define service annotation for Ambassador
const (
SYNTH_LABEL = "synth"
annotations = `
---
apiVersion: x.getambassador.io/v3alpha1
kind: AmbassadorMapping
name: game-mapping-$label
prefix: /game/$label
service: game-$label
allow_upgrade:
- websocket
hostname: '*'
`
)
// Run starts shared informers and waits for the shared informer cache to
// synchronize.
func (c *PodLoggingController) Run(stopCh chan struct{}) error {
// Starts all the shared informers that have been created by the factory so
// far.
c.informerFactory.Start(stopCh)
// wait for the initial synchronization of the local cache.
if !cache.WaitForCacheSync(stopCh, c.podInformer.Informer().HasSynced) {
return fmt.Errorf("failed to sync")
}
return nil
}
func (c *PodLoggingController) podAdd(obj interface{}) {
pod := obj.(*v1.Pod)
label, ok := pod.GetLabels()[SYNTH_LABEL]
if ok {
clientset, _ := getK8SClient()
klog.Infof("POD CREATED: %s/%s/%s", pod.Namespace, pod.Name, label)
createService(clientset, label, pod)
}
}
func (c *PodLoggingController) podUpdate(old, new interface{}) {
oldPod := old.(*v1.Pod)
newPod := new.(*v1.Pod)
newLabel, newOk := newPod.GetLabels()[SYNTH_LABEL]
if newOk {
clientset, _ := getK8SClient()
klog.Infof(
"POD UPDATED. %s/%s/%s %s",
oldPod.Namespace, oldPod.Name, newLabel, newPod.Status.Phase,
)
createService(clientset, newLabel, newPod)
}
}
func (c *PodLoggingController) podDelete(obj interface{}) {
pod := obj.(*v1.Pod)
label, ok := pod.GetLabels()[SYNTH_LABEL]
if ok {
clientset, _ := getK8SClient()
deleteService(clientset, label)
klog.Infof("POD DELETED: %s/%s/%s", pod.Namespace, pod.Name, label)
}
}
// NewPodLoggingController creates a PodLoggingController
func NewPodLoggingController(informerFactory informers.SharedInformerFactory) *PodLoggingController {
podInformer := informerFactory.Core().V1().Pods()
c := &PodLoggingController{
informerFactory: informerFactory,
podInformer: podInformer,
}
podInformer.Informer().AddEventHandler(
// Your custom resource event handlers.
cache.ResourceEventHandlerFuncs{
// Called on creation
AddFunc: c.podAdd,
// Called on resource update and every resyncPeriod on existing resources.
UpdateFunc: c.podUpdate,
// Called on resource deletion.
DeleteFunc: c.podDelete,
},
)
return c
}
var kubeconfig string
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
func main() {
flag.Parse()
logs.InitLogs()
defer logs.FlushLogs()
clientset, _ := getK8SClient()
factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)
controller := NewPodLoggingController(factory)
stop := make(chan struct{})
defer close(stop)
err := controller.Run(stop)
if err != nil {
klog.Fatal(err)
}
select {}
}
// Create K8S Client
func getK8SClient() (*kubernetes.Clientset, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
return clientset, err
}
// Serach for Service associated with GameServer
func findService(clientset kubernetes.Interface, label string) *v1.Service {
nm := "game-" + label
listOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + nm}
// FieldSelector filtering does not work in unit tests
svcs, err := clientset.CoreV1().Services("default").List(context.Background(), listOptions)
if err != nil {
klog.Fatal(err)
}
for _, svc := range svcs.Items {
if svc.Name == nm {
return &svc
}
}
return nil
}
// Create Service associated with GameServer
func createService(clientset kubernetes.Interface, label string, pod *v1.Pod) *v1.Service {
port := pod.Spec.Containers[0].Ports[0].ContainerPort
svc := findService(clientset, label)
if svc != nil {
klog.Infof("SERVICE EXISTS: %s", svc.Name)
return nil
} else {
servicesClient := clientset.CoreV1().Services("default")
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "game-" + label,
Namespace: "default",
Labels: map[string]string{
"synth": label,
},
Annotations: map[string]string{
"getambassador.io/config": strings.Replace(annotations, "$label", label, -1),
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.IntOrString{
Type: intstr.Type(0),
IntVal: port,
},
},
},
Selector: map[string]string{
"synth": label,
},
},
}
// Create Service
_, err := servicesClient.Create(context.Background(), service, metav1.CreateOptions{})
if err != nil {
panic(err)
}
klog.Infof("SERVICE CREATED: game-%s", label)
return service
}
}
// Delete Service associated with GameServer
func deleteService(clientset *kubernetes.Clientset, label string) {
svc := findService(clientset, label)
if svc == nil {
klog.Infof("SERVICE MISSING: game-%s", label)
} else {
servicesClient := clientset.CoreV1().Services("default")
err := servicesClient.Delete(context.Background(), svc.Name, metav1.DeleteOptions{})
if err != nil {
panic(err)
}
klog.Infof("SERVICE DELETED: %s", svc.Name)
}
}
package main
import (
"strings"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
func TestCreateService(t *testing.T) {
t.Parallel()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "gameWithLabelAndService",
Namespace: "default",
Annotations: map[string]string{
"synth": "abc123",
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "app:v4",
Image: "abc/app:v4",
Ports: []v1.ContainerPort{{Name: "http", ContainerPort: 8080}},
}},
},
}
data := []struct {
clientset kubernetes.Interface
pod v1.Pod
expectService bool
inputNamespace string
inputTag string
err error
}{
// Pods are in the system that match the creteria
{
clientset: fake.NewSimpleClientset(
pod,
&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "game-abc123",
Namespace: "default",
Labels: map[string]string{
"synth": "abc123",
},
Annotations: map[string]string{
"getambassador.io/config": strings.Replace(annotations, "$label", "abc123", -1),
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.IntOrString{
Type: intstr.Type(0),
IntVal: 8080,
},
},
},
Selector: map[string]string{
"synth": "abc123",
},
},
},
),
pod: *pod,
inputNamespace: "default",
expectService: true,
inputTag: "abc123",
},
// there are pods in the correct namespace without matching service
{
clientset: fake.NewSimpleClientset(
pod,
&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "game-zabc123",
Namespace: "default",
Labels: map[string]string{
"synth": "zabc123",
},
Annotations: map[string]string{
"getambassador.io/config": strings.Replace(annotations, "$label", "zabc123", -1),
},
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.IntOrString{
Type: intstr.Type(0),
IntVal: 8080,
},
},
},
Selector: map[string]string{
"synth": "zabc123",
},
},
}),
pod: *pod,
inputNamespace: "default",
expectService: false,
inputTag: "def456",
},
}
for _, single := range data {
t.Run("", func(single struct {
clientset kubernetes.Interface
pod v1.Pod
expectService bool
inputNamespace string
inputTag string
err error
}) func(t *testing.T) {
return func(t *testing.T) {
service := createService(single.clientset, single.inputTag, pod)
if !single.expectService && service == nil {
t.Fatal("found unexpected service")
} else if single.expectService && service != nil {
t.Fatal("expected service missing")
}
}
}(single))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment