Created
April 8, 2025 19:54
-
-
Save tkellen/9cf57979a42369c73912280bec4e8248 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controller | |
import ( | |
"bytes" | |
"context" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"io" | |
"log" | |
"net/http" | |
"net/url" | |
"time" | |
appsv1 "k8s.io/api/apps/v1" | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/apimachinery/pkg/watch" | |
"k8s.io/client-go/kubernetes" | |
) | |
const ( | |
hrefAnnotationKey = "honeycomb.io/href" | |
datasetAnnotationKey = "honeycomb.io/dataset" | |
) | |
type marker struct { | |
ID string `json:"id,omitempty"` | |
StartTime int64 `json:"start_time,omitempty"` | |
EndTime int64 `json:"end_time,omitempty"` | |
Message string `json:"message,omitempty"` | |
Type string `json:"type,omitempty"` | |
URL string `json:"url,omitempty"` | |
Color string `json:"color,omitempty"` | |
} | |
type DeploymentMarkerMaker struct { | |
K8s kubernetes.Interface | |
WriteKey string | |
APIHost string | |
Port string | |
ObservationDelaySecs int | |
Logger *log.Logger | |
Ctx context.Context | |
Cancel context.CancelFunc | |
} | |
func (mm *DeploymentMarkerMaker) Start() error { | |
go mm.watchReplicaSets(time.Now().Add(time.Duration(mm.ObservationDelaySecs) * time.Second)) | |
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { | |
select { | |
case <-mm.Ctx.Done(): | |
mm.Logger.Println("watchReplicaSets cancelled") | |
w.WriteHeader(http.StatusInternalServerError) | |
fmt.Fprint(w, "watchReplicaSets cancelled") | |
return | |
default: | |
w.WriteHeader(http.StatusOK) | |
fmt.Fprint(w, "OK") | |
} | |
}) | |
mm.Logger.Printf("Starting HTTP server on port %s for liveness probe...", mm.Port) | |
if err := http.ListenAndServe(":"+mm.Port, nil); err != nil { | |
return fmt.Errorf("failed to start HTTP server: %w", err) | |
} | |
return nil | |
} | |
func (mm *DeploymentMarkerMaker) watchReplicaSets(observationStart time.Time) { | |
watcher, err := mm.K8s.AppsV1().ReplicaSets(metav1.NamespaceAll).Watch(mm.Ctx, metav1.ListOptions{}) | |
if err != nil { | |
mm.Logger.Fatalf("error watching ReplicaSets: %v", err) | |
} | |
defer func() { | |
watcher.Stop() | |
mm.Logger.Println("Stopped watching ReplicaSets. Cancelling context.") | |
mm.Cancel() | |
}() | |
for event := range watcher.ResultChan() { | |
if event.Type != watch.Added { | |
continue | |
} | |
rs, ok := event.Object.(*appsv1.ReplicaSet) | |
if !ok { | |
mm.Logger.Println("unexpected object in watch event") | |
continue | |
} | |
if time.Now().Before(observationStart) { | |
mm.Logger.Printf("Skipping %s/%s: before observation start", rs.Namespace, rs.Name) | |
continue | |
} | |
mm.processReplicaSet(rs) | |
} | |
} | |
func (mm *DeploymentMarkerMaker) processReplicaSet(rs *appsv1.ReplicaSet) { | |
annotations := rs.GetAnnotations() | |
href, ok := annotations[hrefAnnotationKey] | |
if !ok { | |
mm.Logger.Printf("No href annotation for %s/%s", rs.Namespace, rs.Name) | |
return | |
} | |
timestamp := time.Now().Unix() | |
m := marker{ | |
StartTime: timestamp, | |
EndTime: timestamp, | |
Message: fmt.Sprintf("Replicaset added (%s/%s)", rs.Namespace, rs.Name), | |
Type: "deploy", | |
URL: href, | |
Color: "green", | |
} | |
dataset, ok := annotations[datasetAnnotationKey] | |
if !ok { | |
mm.Logger.Printf("No dataset annotation for %s/%s", rs.Namespace, rs.Name) | |
return | |
} | |
if err := mm.CreateMarker(m, dataset); err != nil { | |
mm.Logger.Printf("Failed to create marker for %s/%s: %v", rs.Namespace, rs.Name, err) | |
} | |
} | |
func (mm *DeploymentMarkerMaker) CreateMarker(marker marker, dataset string) error { | |
blob, err := json.Marshal(marker) | |
if err != nil { | |
return fmt.Errorf("marshal error: %w", err) | |
} | |
mm.Logger.Printf("Creating marker: %s for dataset %s", blob, dataset) | |
endpoint, err := url.Parse(mm.APIHost) | |
if err != nil { | |
return fmt.Errorf("invalid API host: %w", err) | |
} | |
endpoint.Path = "/1/markers/" + dataset | |
req, err := http.NewRequest("POST", endpoint.String(), bytes.NewBuffer(blob)) | |
if err != nil { | |
return fmt.Errorf("failed to create request: %w", err) | |
} | |
req.Header.Set("Content-Type", "application/json") | |
req.Header.Set("X-Honeycomb-Team", mm.WriteKey) | |
client := &http.Client{Timeout: 10 * time.Second} | |
resp, err := client.Do(req) | |
if err != nil { | |
return fmt.Errorf("request error: %w", err) | |
} | |
defer resp.Body.Close() | |
body, err := io.ReadAll(resp.Body) | |
if err != nil { | |
return fmt.Errorf("failed to read response: %w", err) | |
} | |
if resp.StatusCode < 200 || resp.StatusCode >= 300 { | |
return errors.New(fmt.Sprintf("Honeycomb error %d: %s", resp.StatusCode, body)) | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment