Last active
October 14, 2020 18:32
-
-
Save fntlnz/65ec4e7273b15e858dda97c0f6f2241b to your computer and use it in GitHub Desktop.
Kubernetes controller example - Related post at: https://medium.com/@fntlnz/what-i-learnt-about-kubernetes-controllers-db7591531973
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[[constraint]] | |
name = "k8s.io/api" | |
version = "kubernetes-1.11.0" | |
[[constraint]] | |
name = "k8s.io/apimachinery" | |
version = "kubernetes-1.11.0" | |
[[constraint]] | |
name = "k8s.io/client-go" | |
version = "kubernetes-1.11.0" | |
[prune] | |
go-tests = true | |
unused-packages = true | |
[[override]] | |
name = "github.com/json-iterator/go" | |
version = "1.1.5" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
"go.uber.org/zap" | |
"k8s.io/api/core/v1" | |
"k8s.io/apimachinery/pkg/fields" | |
"k8s.io/apimachinery/pkg/util/runtime" | |
"k8s.io/apimachinery/pkg/util/wait" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/tools/cache" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/client-go/util/workqueue" | |
) | |
type Controller struct { | |
informer cache.SharedInformer | |
queue workqueue.RateLimitingInterface | |
logger *zap.Logger | |
} | |
func NewController(queue workqueue.RateLimitingInterface, informer cache.SharedInformer) *Controller { | |
return &Controller{ | |
informer: informer, | |
queue: queue, | |
logger: zap.NewNop(), | |
} | |
} | |
func (c *Controller) WithLogger(logger *zap.Logger) { | |
c.logger = logger | |
} | |
func (c *Controller) processNextItem() bool { | |
key, quit := c.queue.Get() | |
if quit { | |
return false | |
} | |
defer c.queue.Done(key) | |
err := c.syncToStdout(key.(string)) | |
c.handleErr(err, key) | |
return true | |
} | |
func (c *Controller) syncToStdout(key string) error { | |
obj, exists, err := c.informer.GetStore().GetByKey(key) | |
if err != nil { | |
c.logger.Error("error fetching object from index for the specified key", zap.String("key", key), zap.Error(err)) | |
return err | |
} | |
if !exists { | |
c.logger.Info("pod has gone", zap.String("key", key)) | |
// do your heavy stuff for when the pod is gone here | |
} else { | |
c.logger.Info("update received for pod", zap.String("key", key), zap.String("pod", obj.(*v1.Pod).GetName())) | |
// do your heavy stuff for when the pod is created/updated here | |
} | |
return nil | |
} | |
func (c *Controller) handleErr(err error, key interface{}) { | |
if err == nil { | |
c.queue.Forget(key) | |
return | |
} | |
if c.queue.NumRequeues(key) < 5 { | |
c.logger.Info("error during sync", zap.String("key", key.(string)), zap.Error(err)) | |
c.queue.AddRateLimited(key) | |
return | |
} | |
c.queue.Forget(key) | |
runtime.HandleError(err) | |
c.logger.Info("drop pod out of queue after many retries", zap.String("key", key.(string)), zap.Error(err)) | |
} | |
func (c *Controller) Run(threadiness int, stopCh chan struct{}) { | |
defer runtime.HandleCrash() | |
defer c.queue.ShutDown() | |
c.logger.Info("starting controller") | |
go c.informer.Run(stopCh) | |
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { | |
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) | |
return | |
} | |
for i := 0; i < threadiness; i++ { | |
go wait.Until(c.runWorker, time.Second, stopCh) | |
} | |
<-stopCh | |
c.logger.Info("stopping controller") | |
} | |
func (c *Controller) runWorker() { | |
for c.processNextItem() { | |
} | |
} | |
func main() { | |
logger, err := zap.NewProduction() | |
if err != nil { | |
panic(err) | |
} | |
rules := clientcmd.NewDefaultClientConfigLoadingRules() | |
overrides := &clientcmd.ConfigOverrides{} | |
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() | |
if err != nil { | |
logger.Fatal("error getting kubernetes client config", zap.Error(err)) | |
} | |
clientset, err := kubernetes.NewForConfig(config) | |
if err != nil { | |
logger.Fatal("error creating kubernetes client", zap.Error(err)) | |
} | |
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) | |
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) | |
si := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, 0) | |
si.AddEventHandler(cache.ResourceEventHandlerFuncs{ | |
AddFunc: func(obj interface{}) { | |
key, err := cache.MetaNamespaceKeyFunc(obj) | |
if err == nil { | |
queue.Add(key) | |
} | |
}, | |
UpdateFunc: func(old interface{}, new interface{}) { | |
key, err := cache.MetaNamespaceKeyFunc(new) | |
if err == nil { | |
queue.Add(key) | |
} | |
}, | |
DeleteFunc: func(obj interface{}) { | |
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) | |
if err == nil { | |
queue.Add(key) | |
} | |
}, | |
}) | |
controller := NewController(queue, si) | |
controller.WithLogger(logger) | |
stop := make(chan struct{}) | |
defer close(stop) | |
go controller.Run(1, stop) | |
select {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment