Skip to content

Instantly share code, notes, and snippets.

@cmwylie19
Created April 3, 2022 13:49
Show Gist options
  • Select an option

  • Save cmwylie19/4f1076591da8191469f3696150ac9c89 to your computer and use it in GitHub Desktop.

Select an option

Save cmwylie19/4f1076591da8191469f3696150ac9c89 to your computer and use it in GitHub Desktop.
ETCD Backup
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Objects
type K8s struct {
clientset *kubernetes.Clientset
}
type ServerOpts struct {
Port uint32
}
type controllerProvider struct {
}
type ControllerProvider interface {
}
// Controllers
func (k *K8s) initDefaults() {
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
k.clientset = clientset
}
func (c *ServerOpts) initDefaults() {
if c.Port == 0 {
c.Port = 8080
}
}
var (
healthCounterPass = promauto.NewCounter(prometheus.CounterOpts{
Name: "http_healthcheck_requests_total",
Help: "Count of all HTTP healthcheck requests.",
})
backupCreateCouterPass = promauto.NewCounter(prometheus.CounterOpts{
Name: "etcd_backup_create_pass_total",
Help: "The total number of backups created",
})
backupCreateCouterFail = promauto.NewCounter(prometheus.CounterOpts{
Name: "etcd_backup_create_fail_total",
Help: "The total number of backups created",
})
backupRestoreCouterPass = promauto.NewCounter(prometheus.CounterOpts{
Name: "etcd_backup_restore_pass_total",
Help: "The total number of backups restored",
})
backupRestoreCouterFail = promauto.NewCounter(prometheus.CounterOpts{
Name: "etcd_backup_restore_fail_total",
Help: "The total number of backups restored",
})
podDeleteCounterPass = promauto.NewCounter(prometheus.CounterOpts{
Name: "pod_delete_pass_total",
Help: "The total number of pods deleted",
})
podDeleteCounterFail = promauto.NewCounter(prometheus.CounterOpts{
Name: "pod_delete_fail_total",
Help: "The total number of pods deleted",
})
podCreateCouterPass = promauto.NewCounter(prometheus.CounterOpts{
Name: "pod_create_pass_total",
Help: "The total number of pods created",
})
podCreateCouterFail = promauto.NewCounter(prometheus.CounterOpts{
Name: "pod_create_fail_total",
Help: "The total number of pods created",
})
podListCouterPass = promauto.NewCounter(prometheus.CounterOpts{
Name: "pod_list_create_total",
Help: "The total requests to list pods",
})
podListCouterFail = promauto.NewCounter(prometheus.CounterOpts{
Name: "pod_list_fail_total",
Help: "The total requests to list pods",
})
)
func NewControllerServer(ctx context.Context, k *K8s, opts *ServerOpts) (ControllerProvider, error) {
// init defaults
k.initDefaults()
opts.initDefaults()
serveMux := http.NewServeMux()
serveMux.Handle("/controller/metrics", promhttp.Handler())
serveMux.HandleFunc("/controller", indexHandler)
serveMux.HandleFunc("/controller/backups/list", k.ListBackups)
serveMux.HandleFunc("/controller/backups/restore", k.RestoreBackup)
serveMux.HandleFunc("/controller/backups/create", k.CreateBackup)
serveMux.HandleFunc("/controller/pods/list", k.GetPods)
serveMux.HandleFunc("/controller/pods/create", k.CreatePod)
serveMux.HandleFunc("/controller/pods/delete", k.DeletePod)
serveMux.HandleFunc("/controller/healthz", k.HealthCheck)
log.Fatal(http.ListenAndServe(":"+fmt.Sprint(opts.Port), serveMux))
webserver := &http.Server{
Addr: fmt.Sprintf(":%d", opts.Port),
Handler: serveMux,
}
go func() {
_ = webserver.ListenAndServe()
}()
go func() {
<-ctx.Done()
webserver.Close()
}()
return &controllerProvider{}, nil
}
// Route Handlers
func (k *K8s) ListBackups(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
backups := make(map[string][]string)
files, err := ioutil.ReadDir("/data/backups")
if err != nil {
log.Fatal(err)
}
for _, file := range files {
backups["backups"] = append(backups["backups"], file.Name())
fmt.Println(file.Name())
}
jsonResponse, err := json.Marshal(backups)
if err != nil {
log.Fatalf("Error in JSON marshal. Err: %s", err)
}
w.Write(jsonResponse)
// SendResponse("status", "Success", w, r)
}
func (k *K8s) DeletePod(w http.ResponseWriter, r *http.Request) {
var gracePeriodSeconds int64 = 0
bg := metav1.DeletePropagationBackground
name := r.URL.Query().Get("name")
err := k.clientset.CoreV1().Pods("default").Delete(context.TODO(), name, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &bg,
})
if err != nil {
log.Println("Failed to delete K8s pod. ", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("Deleted %s pod successfully\n", name)
SendResponse("status", "Success", w, r)
}
func (k8s *K8s) CreatePod(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
image := r.URL.Query().Get("image")
pods := k8s.clientset.CoreV1().Pods("default")
fmt.Printf("name: %s\n\nimage: %s\n", name, image)
if name == "" || image == "" {
fmt.Printf("name %s\nimage %s\n", name, image)
http.Error(w, "Must Provider name and image in query string", http.StatusInternalServerError)
return
}
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Labels: map[string]string{
"app": name,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: name,
Image: image,
ImagePullPolicy: v1.PullIfNotPresent,
},
},
},
}
_, err := pods.Create(context.TODO(), podSpec, metav1.CreateOptions{})
if err != nil {
podCreateCouterFail.Inc()
log.Println("Failed to create K8s pod. ", err)
return
}
//print job details
log.Println("Created K8s pods successfully")
podCreateCouterPass.Inc()
SendResponse("status", "Success", w, r)
}
func (k *K8s) GetPods(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
pods, err := k.clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
jsonResponse, err := json.Marshal(pods)
if err != nil {
podListCouterFail.Inc()
log.Printf("Error in JSON marshal. Err: %s\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
podListCouterPass.Inc()
w.Write(jsonResponse)
}
func (k *K8s) HealthCheck(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
healthCounterPass.Inc()
SendResponse("status", "UP", w, r)
}
func (k8s *K8s) RestoreBackup(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
jobs := k8s.clientset.BatchV1().Jobs("default")
BACKUP_NAME := r.URL.Query().Get("backup")
var backOffLimit int32 = 0
var completions int32 = 1
var runAsUser int64 = 0
var activeDeadlineSeconds int64 = 240
ts := time.Now().UTC().String()
jobSpec := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "restore-" + FormatJobName(ts),
Namespace: "default",
},
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &activeDeadlineSeconds,
Completions: &completions,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "restore-job",
Namespace: "default",
Labels: map[string]string{
"app": "restore-job",
},
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "etcd-manifests",
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "etcd-backup",
},
},
},
}, {
Name: "original-directory",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/var/lib/minikube/etcd",
},
},
}, {
Name: "restore-directory",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/var/lib/minikube/etcd-backup",
},
},
}, {
Name: "static-pods",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/etc/kubernetes/manifests",
},
},
},
{
Name: "etcd-backups",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "backup-job-pvc",
},
},
},
},
NodeName: "minikube",
InitContainers: []v1.Container{
{
Name: "rm-backup-dir",
Image: "busybox",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "rm -rf /var/lib/minikube/etcd-backup/member"},
VolumeMounts: []v1.VolumeMount{
{
Name: "restore-directory",
MountPath: "/var/lib/minikube/etcd-backup",
},
},
SecurityContext: &v1.SecurityContext{
RunAsUser: &runAsUser,
},
},
{
Name: "restore-etcd-to-dir",
Image: "bitnami/etcd:3.5.1",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "ETCDCTL_API=3 etcdctl snapshot restore --data-dir=/var/lib/minikube/etcd-backup /data/backups/$BACKUP_NAME"},
VolumeMounts: []v1.VolumeMount{
{
Name: "restore-directory",
MountPath: "/var/lib/minikube/etcd-backup",
}, {
Name: "etcd-backups",
MountPath: "/data/backups",
},
},
SecurityContext: &v1.SecurityContext{
RunAsUser: &runAsUser,
},
Env: []v1.EnvVar{
v1.EnvVar{
Name: "BACKUP_NAME",
Value: BACKUP_NAME,
},
},
},
{
Name: "update-etcd",
Image: "busybox",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "cat /updates/etcd.yaml > /etc/kubernetes/manifests/etcd.yaml"},
VolumeMounts: []v1.VolumeMount{
{
Name: "etcd-manifests",
MountPath: "/updates",
}, {
Name: "static-pods",
MountPath: "/etc/kubernetes/manifests",
},
},
},
{
Name: "empty-original-etcd-dir",
Image: "busybox",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "rm -rf /var/lib/minikube/etcd/member"},
VolumeMounts: []v1.VolumeMount{
{
Name: "etcd-manifests",
MountPath: "/updates",
}, {
Name: "static-pods",
MountPath: "/etc/kubernetes/manifests",
}, {
Name: "restore-directory",
MountPath: "/var/lib/minikube/etcd-backup",
}, {
Name: "original-directory",
MountPath: "/var/lib/minikube/etcd",
},
},
},
{
Name: "copy-backup-dir-to-original",
Image: "busybox",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "cp -fr /var/lib/minikube/etcd-backup/member /var/lib/minikube/etcd/member"},
VolumeMounts: []v1.VolumeMount{
{
Name: "etcd-manifests",
MountPath: "/updates",
}, {
Name: "static-pods",
MountPath: "/etc/kubernetes/manifests",
}, {
Name: "restore-directory",
MountPath: "/var/lib/minikube/etcd-backup",
}, {
Name: "original-directory",
MountPath: "/var/lib/minikube/etcd",
},
},
},
{
Name: "update-etcd-original",
Image: "busybox",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "cat /updates/etcd-original.yaml > /etc/kubernetes/manifests/etcd.yaml"},
VolumeMounts: []v1.VolumeMount{
{
Name: "etcd-manifests",
MountPath: "/updates",
}, {
Name: "static-pods",
MountPath: "/etc/kubernetes/manifests",
},
},
},
},
Containers: []v1.Container{
{
Name: "restore-job",
Image: "nginx",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "sleep 1s"},
},
},
RestartPolicy: v1.RestartPolicyNever,
},
},
BackoffLimit: &backOffLimit,
},
}
_, err := jobs.Create(context.TODO(), jobSpec, metav1.CreateOptions{})
if err != nil {
log.Println("Failed to create K8s job. ", err)
}
//print job details
log.Println("Created K8s job successfully")
SendResponse("status", "Success", w, r)
}
func (k8s *K8s) CreateBackup(w http.ResponseWriter, r *http.Request) {
jobs := k8s.clientset.BatchV1().Jobs("default")
var backOffLimit int32 = 0
var completions int32 = 1
var runAsUser int64 = 0
var activeDeadlineSeconds int64 = 240
ts := time.Now().UTC().String()
jobSpec := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "backup-" + FormatJobName(ts),
Namespace: "default",
},
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &activeDeadlineSeconds,
Completions: &completions,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "backup",
Namespace: "default",
Labels: map[string]string{
"app": "backup-job",
},
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "etcd-certs",
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: "etcd-certs",
},
},
}, {
Name: "etcd-backups",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "backup-job-pvc",
},
},
},
},
NodeName: "minikube",
Containers: []v1.Container{
{
Name: "backup-job",
Image: "bitnami/etcd:3.5.1",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c", "ETCDCTL_API=3 etcdctl snapshot save /data/backups/snap-$(date +%Y-%m-%d_%H:%M:%S_%Z).db --dial-timeout=12s --debug=true --keepalive-timeout=30s --keepalive-time=30s --cacert=/certs/ca.crt --cert=/certs/server.crt --key=/certs/server.key --endpoints=$ENDPOINTS"},
VolumeMounts: []v1.VolumeMount{
{
Name: "etcd-certs",
MountPath: "/certs",
}, {
Name: "etcd-backups",
MountPath: "/data/backups",
},
},
SecurityContext: &v1.SecurityContext{
RunAsUser: &runAsUser,
},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("5m"),
v1.ResourceMemory: resource.MustParse("20Mi"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2m"),
v1.ResourceMemory: resource.MustParse("12Mi"),
},
},
Env: []v1.EnvVar{
v1.EnvVar{
Name: "ENDPOINTS",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: "etcd-certs",
},
Key: "endpoints",
},
},
},
},
},
},
RestartPolicy: v1.RestartPolicyNever,
},
},
BackoffLimit: &backOffLimit,
},
}
_, err := jobs.Create(context.TODO(), jobSpec, metav1.CreateOptions{})
if err != nil {
log.Println("Failed to create K8s job. ", err)
}
//print job details
log.Println("Created K8s job successfully")
SendResponse("status", "Success", w, r)
}
func indexHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
if r.Method == "GET" {
w.Write([]byte("<body style=\"backgound-color: #fbfbfb\"><h1>ETCD Backend</h1></body>"))
} else {
http.Error(w, "Only GET requests are allowed!", http.StatusMethodNotAllowed)
}
}
// Helpers
func FormatJobName(name string) string {
trimmed_name := strings.ReplaceAll(name, " ", "")
remove_colons := strings.ReplaceAll(trimmed_name, ":", "-")
remove_periods := strings.ReplaceAll(remove_colons, ".", "")
return strings.ReplaceAll(remove_periods, "+", "")[:len(remove_periods)-6]
}
func SendResponse(key string, value string, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
response := make(map[string]string)
response[key] = value
jsonResponse, err := json.Marshal(response)
if err != nil {
log.Fatalf("Error in JSON marshal. Err: %s", err)
}
w.Write(jsonResponse)
}
func getPort() uint32 {
if port, err := strconv.Atoi(os.Getenv("PORT")); err == nil {
return uint32(port)
} else {
return 0
}
}
func main() {
if _, err := NewControllerServer(context.TODO(), &K8s{}, &ServerOpts{
Port: getPort(),
}); err != nil {
log.Fatalf(err.Error())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment