|
package main |
|
|
|
import ( |
|
"context" |
|
"flag" |
|
"fmt" |
|
"strings" |
|
"time" |
|
|
|
"k8s.io/client-go/informers" |
|
coreinformers "k8s.io/client-go/informers/core/v1" |
|
"k8s.io/client-go/kubernetes" |
|
klog "k8s.io/klog/v2" |
|
|
|
v1 "k8s.io/api/core/v1" |
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
|
"k8s.io/apimachinery/pkg/util/intstr" |
|
"k8s.io/client-go/tools/cache" |
|
"k8s.io/client-go/tools/clientcmd" |
|
"k8s.io/kubectl/pkg/util/logs" |
|
) |
|
|
|
// PodLoggingController logs the name and namespace of pods that are added, |
|
// deleted, or updated |
|
type PodLoggingController struct { |
|
informerFactory informers.SharedInformerFactory |
|
podInformer coreinformers.PodInformer |
|
} |
|
|
|
// define the label we are looking for |
|
// define service annotation for Ambassador |
|
const ( |
|
SYNTH_LABEL = "synth" |
|
annotations = ` |
|
--- |
|
apiVersion: x.getambassador.io/v3alpha1 |
|
kind: AmbassadorMapping |
|
name: game-mapping-$label |
|
prefix: /game/$label |
|
service: game-$label |
|
allow_upgrade: |
|
- websocket |
|
hostname: '*' |
|
` |
|
) |
|
|
|
// Run starts shared informers and waits for the shared informer cache to |
|
// synchronize. |
|
func (c *PodLoggingController) Run(stopCh chan struct{}) error { |
|
// Starts all the shared informers that have been created by the factory so |
|
// far. |
|
c.informerFactory.Start(stopCh) |
|
// wait for the initial synchronization of the local cache. |
|
if !cache.WaitForCacheSync(stopCh, c.podInformer.Informer().HasSynced) { |
|
return fmt.Errorf("failed to sync") |
|
} |
|
return nil |
|
} |
|
|
|
func (c *PodLoggingController) podAdd(obj interface{}) { |
|
pod := obj.(*v1.Pod) |
|
label, ok := pod.GetLabels()[SYNTH_LABEL] |
|
if ok { |
|
clientset, _ := getK8SClient() |
|
klog.Infof("POD CREATED: %s/%s/%s", pod.Namespace, pod.Name, label) |
|
createService(clientset, label, pod) |
|
} |
|
} |
|
|
|
func (c *PodLoggingController) podUpdate(old, new interface{}) { |
|
oldPod := old.(*v1.Pod) |
|
newPod := new.(*v1.Pod) |
|
newLabel, newOk := newPod.GetLabels()[SYNTH_LABEL] |
|
if newOk { |
|
clientset, _ := getK8SClient() |
|
klog.Infof( |
|
"POD UPDATED. %s/%s/%s %s", |
|
oldPod.Namespace, oldPod.Name, newLabel, newPod.Status.Phase, |
|
) |
|
createService(clientset, newLabel, newPod) |
|
} |
|
} |
|
|
|
func (c *PodLoggingController) podDelete(obj interface{}) { |
|
pod := obj.(*v1.Pod) |
|
label, ok := pod.GetLabels()[SYNTH_LABEL] |
|
if ok { |
|
clientset, _ := getK8SClient() |
|
deleteService(clientset, label) |
|
klog.Infof("POD DELETED: %s/%s/%s", pod.Namespace, pod.Name, label) |
|
} |
|
} |
|
|
|
// NewPodLoggingController creates a PodLoggingController |
|
func NewPodLoggingController(informerFactory informers.SharedInformerFactory) *PodLoggingController { |
|
podInformer := informerFactory.Core().V1().Pods() |
|
|
|
c := &PodLoggingController{ |
|
informerFactory: informerFactory, |
|
podInformer: podInformer, |
|
} |
|
podInformer.Informer().AddEventHandler( |
|
// Your custom resource event handlers. |
|
cache.ResourceEventHandlerFuncs{ |
|
// Called on creation |
|
AddFunc: c.podAdd, |
|
// Called on resource update and every resyncPeriod on existing resources. |
|
UpdateFunc: c.podUpdate, |
|
// Called on resource deletion. |
|
DeleteFunc: c.podDelete, |
|
}, |
|
) |
|
return c |
|
} |
|
|
|
var kubeconfig string |
|
|
|
func init() { |
|
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file") |
|
} |
|
|
|
func main() { |
|
flag.Parse() |
|
logs.InitLogs() |
|
defer logs.FlushLogs() |
|
|
|
clientset, _ := getK8SClient() |
|
|
|
factory := informers.NewSharedInformerFactory(clientset, time.Hour*24) |
|
controller := NewPodLoggingController(factory) |
|
stop := make(chan struct{}) |
|
defer close(stop) |
|
err := controller.Run(stop) |
|
if err != nil { |
|
klog.Fatal(err) |
|
} |
|
select {} |
|
} |
|
|
|
// Create K8S Client |
|
func getK8SClient() (*kubernetes.Clientset, error) { |
|
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) |
|
if err != nil { |
|
panic(err.Error()) |
|
} |
|
|
|
clientset, err := kubernetes.NewForConfig(config) |
|
if err != nil { |
|
klog.Fatal(err) |
|
} |
|
|
|
return clientset, err |
|
} |
|
|
|
// Serach for Service associated with GameServer |
|
func findService(clientset kubernetes.Interface, label string) *v1.Service { |
|
nm := "game-" + label |
|
listOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + nm} |
|
// FieldSelector filtering does not work in unit tests |
|
svcs, err := clientset.CoreV1().Services("default").List(context.Background(), listOptions) |
|
if err != nil { |
|
klog.Fatal(err) |
|
} |
|
for _, svc := range svcs.Items { |
|
if svc.Name == nm { |
|
return &svc |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// Create Service associated with GameServer |
|
func createService(clientset kubernetes.Interface, label string, pod *v1.Pod) *v1.Service { |
|
port := pod.Spec.Containers[0].Ports[0].ContainerPort |
|
svc := findService(clientset, label) |
|
if svc != nil { |
|
klog.Infof("SERVICE EXISTS: %s", svc.Name) |
|
return nil |
|
} else { |
|
servicesClient := clientset.CoreV1().Services("default") |
|
|
|
service := &v1.Service{ |
|
ObjectMeta: metav1.ObjectMeta{ |
|
Name: "game-" + label, |
|
Namespace: "default", |
|
Labels: map[string]string{ |
|
"synth": label, |
|
}, |
|
Annotations: map[string]string{ |
|
"getambassador.io/config": strings.Replace(annotations, "$label", label, -1), |
|
}, |
|
}, |
|
Spec: v1.ServiceSpec{ |
|
Ports: []v1.ServicePort{ |
|
{ |
|
Name: "http", |
|
Port: 80, |
|
TargetPort: intstr.IntOrString{ |
|
Type: intstr.Type(0), |
|
IntVal: port, |
|
}, |
|
}, |
|
}, |
|
Selector: map[string]string{ |
|
"synth": label, |
|
}, |
|
}, |
|
} |
|
|
|
// Create Service |
|
_, err := servicesClient.Create(context.Background(), service, metav1.CreateOptions{}) |
|
if err != nil { |
|
panic(err) |
|
} |
|
klog.Infof("SERVICE CREATED: game-%s", label) |
|
return service |
|
} |
|
} |
|
|
|
// Delete Service associated with GameServer |
|
func deleteService(clientset *kubernetes.Clientset, label string) { |
|
svc := findService(clientset, label) |
|
if svc == nil { |
|
klog.Infof("SERVICE MISSING: game-%s", label) |
|
} else { |
|
servicesClient := clientset.CoreV1().Services("default") |
|
err := servicesClient.Delete(context.Background(), svc.Name, metav1.DeleteOptions{}) |
|
if err != nil { |
|
panic(err) |
|
} |
|
klog.Infof("SERVICE DELETED: %s", svc.Name) |
|
} |
|
} |