Skip to content

Instantly share code, notes, and snippets.

@tkellen
Created April 8, 2025 19:54
Show Gist options
  • Save tkellen/9cf57979a42369c73912280bec4e8248 to your computer and use it in GitHub Desktop.
Save tkellen/9cf57979a42369c73912280bec4e8248 to your computer and use it in GitHub Desktop.
package controller
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"time"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
const (
hrefAnnotationKey = "honeycomb.io/href"
datasetAnnotationKey = "honeycomb.io/dataset"
)
type marker struct {
ID string `json:"id,omitempty"`
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
Message string `json:"message,omitempty"`
Type string `json:"type,omitempty"`
URL string `json:"url,omitempty"`
Color string `json:"color,omitempty"`
}
type DeploymentMarkerMaker struct {
K8s kubernetes.Interface
WriteKey string
APIHost string
Port string
ObservationDelaySecs int
Logger *log.Logger
Ctx context.Context
Cancel context.CancelFunc
}
func (mm *DeploymentMarkerMaker) Start() error {
go mm.watchReplicaSets(time.Now().Add(time.Duration(mm.ObservationDelaySecs) * time.Second))
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
select {
case <-mm.Ctx.Done():
mm.Logger.Println("watchReplicaSets cancelled")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "watchReplicaSets cancelled")
return
default:
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "OK")
}
})
mm.Logger.Printf("Starting HTTP server on port %s for liveness probe...", mm.Port)
if err := http.ListenAndServe(":"+mm.Port, nil); err != nil {
return fmt.Errorf("failed to start HTTP server: %w", err)
}
return nil
}
func (mm *DeploymentMarkerMaker) watchReplicaSets(observationStart time.Time) {
watcher, err := mm.K8s.AppsV1().ReplicaSets(metav1.NamespaceAll).Watch(mm.Ctx, metav1.ListOptions{})
if err != nil {
mm.Logger.Fatalf("error watching ReplicaSets: %v", err)
}
defer func() {
watcher.Stop()
mm.Logger.Println("Stopped watching ReplicaSets. Cancelling context.")
mm.Cancel()
}()
for event := range watcher.ResultChan() {
if event.Type != watch.Added {
continue
}
rs, ok := event.Object.(*appsv1.ReplicaSet)
if !ok {
mm.Logger.Println("unexpected object in watch event")
continue
}
if time.Now().Before(observationStart) {
mm.Logger.Printf("Skipping %s/%s: before observation start", rs.Namespace, rs.Name)
continue
}
mm.processReplicaSet(rs)
}
}
func (mm *DeploymentMarkerMaker) processReplicaSet(rs *appsv1.ReplicaSet) {
annotations := rs.GetAnnotations()
href, ok := annotations[hrefAnnotationKey]
if !ok {
mm.Logger.Printf("No href annotation for %s/%s", rs.Namespace, rs.Name)
return
}
timestamp := time.Now().Unix()
m := marker{
StartTime: timestamp,
EndTime: timestamp,
Message: fmt.Sprintf("Replicaset added (%s/%s)", rs.Namespace, rs.Name),
Type: "deploy",
URL: href,
Color: "green",
}
dataset, ok := annotations[datasetAnnotationKey]
if !ok {
mm.Logger.Printf("No dataset annotation for %s/%s", rs.Namespace, rs.Name)
return
}
if err := mm.CreateMarker(m, dataset); err != nil {
mm.Logger.Printf("Failed to create marker for %s/%s: %v", rs.Namespace, rs.Name, err)
}
}
func (mm *DeploymentMarkerMaker) CreateMarker(marker marker, dataset string) error {
blob, err := json.Marshal(marker)
if err != nil {
return fmt.Errorf("marshal error: %w", err)
}
mm.Logger.Printf("Creating marker: %s for dataset %s", blob, dataset)
endpoint, err := url.Parse(mm.APIHost)
if err != nil {
return fmt.Errorf("invalid API host: %w", err)
}
endpoint.Path = "/1/markers/" + dataset
req, err := http.NewRequest("POST", endpoint.String(), bytes.NewBuffer(blob))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Honeycomb-Team", mm.WriteKey)
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("request error: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return errors.New(fmt.Sprintf("Honeycomb error %d: %s", resp.StatusCode, body))
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment