Created
June 4, 2018 15:36
-
-
Save slok/65f5a9b395830fc6c9541573ede2c695 to your computer and use it in GitHub Desktop.
Kubernetes controller that updates annotation on pods with `kooper: test` label
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"os" | |
"path/filepath" | |
"time" | |
corev1 "k8s.io/api/core/v1" | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/apimachinery/pkg/runtime" | |
"k8s.io/apimachinery/pkg/watch" | |
"k8s.io/client-go/kubernetes" | |
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc" | |
"k8s.io/client-go/rest" | |
"k8s.io/client-go/tools/cache" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/client-go/util/homedir" | |
"github.com/spotahome/kooper/log" | |
"github.com/spotahome/kooper/operator/controller" | |
"github.com/spotahome/kooper/operator/handler" | |
"github.com/spotahome/kooper/operator/retrieve" | |
) | |
func main() { | |
// Initialize logger. | |
log := &log.Std{} | |
// Get k8s client. | |
k8scfg, err := rest.InClusterConfig() | |
if err != nil { | |
// No in cluster? letr's try locally | |
kubehome := filepath.Join(homedir.HomeDir(), ".kube", "config") | |
k8scfg, err = clientcmd.BuildConfigFromFlags("", kubehome) | |
if err != nil { | |
log.Errorf("error loading kubernetes configuration: %s", err) | |
os.Exit(1) | |
} | |
} | |
k8scli, err := kubernetes.NewForConfig(k8scfg) | |
if err != nil { | |
log.Errorf("error creating kubernetes client: %s", err) | |
os.Exit(1) | |
} | |
// Create our retriever so the controller knows how to get/listen for pod events. | |
retr := &retrieve.Resource{ | |
Object: &corev1.Pod{}, | |
ListerWatcher: &cache.ListWatch{ | |
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { | |
// Just apply logic on this type of pods. | |
options.LabelSelector = "test=kooper" | |
return k8scli.CoreV1().Pods("").List(options) | |
}, | |
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | |
// Just apply logic on this type of pods. | |
options.LabelSelector = "test=kooper" | |
return k8scli.CoreV1().Pods("").Watch(options) | |
}, | |
}, | |
} | |
// Our domain logic that will update annotations every add/sync/update of filtered pods. | |
hand := &handler.HandlerFunc{ | |
AddFunc: func(obj runtime.Object) error { | |
pod := obj.(*corev1.Pod) | |
log.Infof("Pod being handled: %s", pod.ObjectMeta.Name) | |
// Copy pod and update the annotation. | |
newPod := pod.DeepCopy() | |
ann := newPod.ObjectMeta.Annotations | |
if ann == nil { | |
ann = make(map[string]string) | |
} | |
ann["kooper"] = "handled" | |
newPod.ObjectMeta.Annotations = ann | |
_, err := k8scli.CoreV1().Pods(newPod.ObjectMeta.Namespace).Update(newPod) | |
if err != nil { | |
return err | |
} | |
log.Infof("Pod annotation updated: %s", pod.ObjectMeta.Name) | |
return nil | |
}, | |
DeleteFunc: func(s string) error { | |
log.Infof("Pod deleted: %s", s) | |
return nil | |
}, | |
} | |
// Create the controller that will refresh every 30 seconds. | |
ctrl := controller.NewSequential(30*time.Second, hand, retr, nil, log) | |
// Start our controller. | |
stopC := make(chan struct{}) | |
if err := ctrl.Run(stopC); err != nil { | |
log.Errorf("error running controller: %s", err) | |
os.Exit(1) | |
} | |
os.Exit(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Test
Listen to Resources
We listen to
corev1.Pod
resource kinds (in all namespaces) and only the ones that havetest: kooper
label:https://gist.github.com/slok/65f5a9b395830fc6c9541573ede2c695#file-main-go-L50-LL58
Update pod annotation
We make a deep copy of the object, ensure the annotations map exists (if not we create), add our annotation
kooper: handled
and update with the kubernetes client.https://gist.github.com/slok/65f5a9b395830fc6c9541573ede2c695#file-main-go-L69-L81