Created
February 18, 2026 17:10
-
-
Save thc1006/daa56f0b5267094578797afbcf194a48 to your computer and use it in GitHub Desktop.
Unit tests for PR #336 (parallel KRM function evaluator pods) — podcachemanager.go & podmanager.go coverage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Copyright 2025 The Nephio Authors. | |
| // | |
| // Licensed under the Apache License, Version 2.0 (the "License"); | |
| // you may not use this file except in compliance with the License. | |
| // You may obtain a copy of the License at | |
| // | |
| // http://www.apache.org/licenses/LICENSE-2.0 | |
| // | |
| // Unless required by applicable law or agreed to in writing, software | |
| // distributed under the License is distributed on an "AS IS" BASIS, | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| // See the License for the specific language governing permissions and | |
| // limitations under the License. | |
| package internal | |
| import ( | |
| "fmt" | |
| "net" | |
| "os" | |
| "sync" | |
| "sync/atomic" | |
| "testing" | |
| "time" | |
| "github.com/stretchr/testify/assert" | |
| "github.com/stretchr/testify/require" | |
| "google.golang.org/grpc" | |
| "google.golang.org/grpc/credentials/insecure" | |
| corev1 "k8s.io/api/core/v1" | |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
| "sigs.k8s.io/controller-runtime/pkg/client" | |
| "sigs.k8s.io/controller-runtime/pkg/client/fake" | |
| ) | |
| // makePodInfoWithLoad creates a functionPodInfo with the specified concurrent evaluation count. | |
| func makePodInfoWithLoad(load int32) functionPodInfo { | |
| counter := &atomic.Int32{} | |
| counter.Store(load) | |
| return functionPodInfo{ | |
| concurrentEvaluations: counter, | |
| fnEvaluationMutex: &sync.Mutex{}, | |
| lastActivity: time.Now(), | |
| waitlist: []chan<- *connectionResponse{}, | |
| } | |
| } | |
| // makeReadyPodInfo creates a functionPodInfo with podData, for testing functions that require a ready pod. | |
| func makeReadyPodInfo(image string, podKey, serviceKey client.ObjectKey, grpcConn *grpc.ClientConn, load int32) functionPodInfo { | |
| counter := &atomic.Int32{} | |
| counter.Store(load) | |
| return functionPodInfo{ | |
| podData: &podData{ | |
| image: image, | |
| podKey: &podKey, | |
| serviceKey: &serviceKey, | |
| grpcConnection: grpcConn, | |
| }, | |
| concurrentEvaluations: counter, | |
| fnEvaluationMutex: &sync.Mutex{}, | |
| lastActivity: time.Now(), | |
| waitlist: []chan<- *connectionResponse{}, | |
| } | |
| } | |
| func TestFindBestPod(t *testing.T) { | |
| pcm := &podCacheManager{} | |
| tests := []struct { | |
| name string | |
| fn *functionInfo | |
| expectedIdx int | |
| expectedWaitlist int | |
| }{ | |
| { | |
| name: "nil function info returns -1", | |
| fn: nil, | |
| expectedIdx: -1, | |
| expectedWaitlist: 0, | |
| }, | |
| { | |
| name: "empty pods returns -1", | |
| fn: &functionInfo{pods: []functionPodInfo{}}, | |
| expectedIdx: -1, | |
| expectedWaitlist: 0, | |
| }, | |
| { | |
| name: "single pod with no load", | |
| fn: &functionInfo{pods: []functionPodInfo{ | |
| makePodInfoWithLoad(0), | |
| }}, | |
| expectedIdx: 0, | |
| expectedWaitlist: 0, | |
| }, | |
| { | |
| name: "single pod with load", | |
| fn: &functionInfo{pods: []functionPodInfo{ | |
| makePodInfoWithLoad(5), | |
| }}, | |
| expectedIdx: 0, | |
| expectedWaitlist: 5, | |
| }, | |
| { | |
| name: "multiple pods selects least loaded", | |
| fn: &functionInfo{pods: []functionPodInfo{ | |
| makePodInfoWithLoad(5), | |
| makePodInfoWithLoad(1), | |
| makePodInfoWithLoad(8), | |
| }}, | |
| expectedIdx: 1, | |
| expectedWaitlist: 1, | |
| }, | |
| { | |
| name: "multiple pods with equal load selects first", | |
| fn: &functionInfo{pods: []functionPodInfo{ | |
| makePodInfoWithLoad(3), | |
| makePodInfoWithLoad(3), | |
| makePodInfoWithLoad(3), | |
| }}, | |
| expectedIdx: 0, | |
| expectedWaitlist: 3, | |
| }, | |
| { | |
| name: "last pod is least loaded", | |
| fn: &functionInfo{pods: []functionPodInfo{ | |
| makePodInfoWithLoad(10), | |
| makePodInfoWithLoad(7), | |
| makePodInfoWithLoad(2), | |
| }}, | |
| expectedIdx: 2, | |
| expectedWaitlist: 2, | |
| }, | |
| } | |
| for _, tt := range tests { | |
| t.Run(tt.name, func(t *testing.T) { | |
| idx, waitlist := pcm.findBestPod(tt.fn) | |
| assert.Equal(t, tt.expectedIdx, idx) | |
| assert.Equal(t, tt.expectedWaitlist, waitlist) | |
| }) | |
| } | |
| } | |
| func TestGetParamsForImage(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| maxWaitlistLength: 2, | |
| maxParallelPodsPerFunction: 3, | |
| configMap: map[string]podCacheConfigEntry{ | |
| "full-override": { | |
| Name: "full-override", | |
| TimeToLive: "5m", | |
| MaxWaitlistLength: 10, | |
| MaxParallelPodsPerFunction: 5, | |
| }, | |
| "partial-override": { | |
| Name: "partial-override", | |
| TimeToLive: "3m", | |
| MaxWaitlistLength: 0, // zero -> falls back to default | |
| }, | |
| "invalid-ttl": { | |
| Name: "invalid-ttl", | |
| TimeToLive: "not-a-duration", | |
| MaxWaitlistLength: 4, | |
| MaxParallelPodsPerFunction: 2, | |
| }, | |
| "zero-ttl": { | |
| Name: "zero-ttl", | |
| TimeToLive: "0s", | |
| MaxWaitlistLength: 1, | |
| MaxParallelPodsPerFunction: 1, | |
| }, | |
| }, | |
| } | |
| tests := []struct { | |
| name string | |
| image string | |
| expectedTTL time.Duration | |
| expectedWaitlist int | |
| expectedMaxPods int | |
| }{ | |
| { | |
| name: "full override from configMap", | |
| image: "full-override", | |
| expectedTTL: 5 * time.Minute, | |
| expectedWaitlist: 10, | |
| expectedMaxPods: 5, | |
| }, | |
| { | |
| name: "partial override falls back to defaults for zero values", | |
| image: "partial-override", | |
| expectedTTL: 3 * time.Minute, | |
| expectedWaitlist: 2, // default | |
| expectedMaxPods: 3, // default | |
| }, | |
| { | |
| name: "invalid TTL falls back to default TTL", | |
| image: "invalid-ttl", | |
| expectedTTL: 10 * time.Minute, // default | |
| expectedWaitlist: 4, | |
| expectedMaxPods: 2, | |
| }, | |
| { | |
| name: "zero TTL falls back to default TTL", | |
| image: "zero-ttl", | |
| expectedTTL: 10 * time.Minute, // default | |
| expectedWaitlist: 1, | |
| expectedMaxPods: 1, | |
| }, | |
| { | |
| name: "image not in configMap uses all defaults", | |
| image: "unknown-image", | |
| expectedTTL: 10 * time.Minute, | |
| expectedWaitlist: 2, | |
| expectedMaxPods: 3, | |
| }, | |
| } | |
| for _, tt := range tests { | |
| t.Run(tt.name, func(t *testing.T) { | |
| ttl, maxWaitlist, maxPods := pcm.getParamsForImage(tt.image) | |
| assert.Equal(t, tt.expectedTTL, ttl) | |
| assert.Equal(t, tt.expectedWaitlist, maxWaitlist) | |
| assert.Equal(t, tt.expectedMaxPods, maxPods) | |
| }) | |
| } | |
| } | |
| func TestLoadPodCacheConfig(t *testing.T) { | |
| t.Run("valid config file", func(t *testing.T) { | |
| content := ` | |
| - name: "gcr.io/kpt-fn/apply-replacements" | |
| timeToLive: "30m" | |
| maxWaitlistLength: 5 | |
| maxParallelPodsPerFunction: 3 | |
| - name: "gcr.io/kpt-fn/set-namespace" | |
| timeToLive: "10m" | |
| maxWaitlistLength: 2 | |
| maxParallelPodsPerFunction: 1 | |
| ` | |
| tmpFile, err := os.CreateTemp("", "pod-cache-config-*.yaml") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString(content) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| configMap, err := loadPodCacheConfig(tmpFile.Name()) | |
| require.NoError(t, err) | |
| assert.Len(t, configMap, 2) | |
| entry, ok := configMap["gcr.io/kpt-fn/apply-replacements"] | |
| assert.True(t, ok) | |
| assert.Equal(t, "30m", entry.TimeToLive) | |
| assert.Equal(t, 5, entry.MaxWaitlistLength) | |
| assert.Equal(t, 3, entry.MaxParallelPodsPerFunction) | |
| entry2, ok := configMap["gcr.io/kpt-fn/set-namespace"] | |
| assert.True(t, ok) | |
| assert.Equal(t, "10m", entry2.TimeToLive) | |
| }) | |
| t.Run("empty config file", func(t *testing.T) { | |
| tmpFile, err := os.CreateTemp("", "pod-cache-config-empty-*.yaml") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString("[]") | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| configMap, err := loadPodCacheConfig(tmpFile.Name()) | |
| require.NoError(t, err) | |
| assert.Empty(t, configMap) | |
| }) | |
| t.Run("invalid YAML", func(t *testing.T) { | |
| tmpFile, err := os.CreateTemp("", "pod-cache-config-invalid-*.yaml") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString("{{invalid yaml") | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| _, err = loadPodCacheConfig(tmpFile.Name()) | |
| assert.Error(t, err) | |
| }) | |
| t.Run("missing file", func(t *testing.T) { | |
| _, err := loadPodCacheConfig("/nonexistent/path/config.yaml") | |
| assert.Error(t, err) | |
| }) | |
| } | |
| func TestNewPodInfo(t *testing.T) { | |
| t.Run("nil channel creates pod with empty waitlist", func(t *testing.T) { | |
| pod := NewPodInfo(nil) | |
| assert.Nil(t, pod.podData) | |
| assert.Empty(t, pod.waitlist) | |
| assert.Equal(t, int32(0), pod.concurrentEvaluations.Load()) | |
| assert.NotNil(t, pod.fnEvaluationMutex) | |
| }) | |
| t.Run("non-nil channel adds to waitlist and increments counter", func(t *testing.T) { | |
| ch := make(chan *connectionResponse, 1) | |
| pod := NewPodInfo(ch) | |
| assert.Nil(t, pod.podData) | |
| assert.Len(t, pod.waitlist, 1) | |
| assert.Equal(t, int32(1), pod.concurrentEvaluations.Load()) | |
| }) | |
| } | |
| func TestSendResponse(t *testing.T) { | |
| t.Run("sends error when err is not nil", func(t *testing.T) { | |
| pod := &functionPodInfo{ | |
| podData: &podData{image: "test"}, | |
| concurrentEvaluations: &atomic.Int32{}, | |
| fnEvaluationMutex: &sync.Mutex{}, | |
| } | |
| ch := make(chan *connectionResponse, 1) | |
| testErr := fmt.Errorf("test error") | |
| pod.SendResponse(ch, testErr) | |
| resp := <-ch | |
| assert.Error(t, resp.err) | |
| assert.Equal(t, "test error", resp.err.Error()) | |
| }) | |
| t.Run("sends error when podData is nil", func(t *testing.T) { | |
| pod := &functionPodInfo{ | |
| podData: nil, | |
| concurrentEvaluations: &atomic.Int32{}, | |
| fnEvaluationMutex: &sync.Mutex{}, | |
| } | |
| ch := make(chan *connectionResponse, 1) | |
| pod.SendResponse(ch, nil) | |
| resp := <-ch | |
| assert.Error(t, resp.err) | |
| assert.Contains(t, resp.err.Error(), "pod is not ready") | |
| }) | |
| t.Run("sends success with podData", func(t *testing.T) { | |
| conn, err := grpc.NewClient("localhost:9446", grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| require.NoError(t, err) | |
| podKey := client.ObjectKey{Name: "test-pod", Namespace: "test-ns"} | |
| serviceKey := client.ObjectKey{Name: "test-svc", Namespace: "test-ns"} | |
| pod := &functionPodInfo{ | |
| podData: &podData{ | |
| image: "test-image", | |
| grpcConnection: conn, | |
| podKey: &podKey, | |
| serviceKey: &serviceKey, | |
| }, | |
| concurrentEvaluations: &atomic.Int32{}, | |
| fnEvaluationMutex: &sync.Mutex{}, | |
| } | |
| ch := make(chan *connectionResponse, 1) | |
| pod.SendResponse(ch, nil) | |
| resp := <-ch | |
| assert.NoError(t, resp.err) | |
| assert.Equal(t, "test-image", resp.podData.image) | |
| assert.NotNil(t, resp.grpcConnection) | |
| assert.NotNil(t, resp.fnEvaluationMutex) | |
| assert.NotNil(t, resp.concurrentEvaluations) | |
| }) | |
| } | |
| func TestWaitlistLen(t *testing.T) { | |
| counter := &atomic.Int32{} | |
| counter.Store(7) | |
| pod := functionPodInfo{ | |
| concurrentEvaluations: counter, | |
| } | |
| assert.Equal(t, 7, pod.WaitlistLen()) | |
| counter.Store(0) | |
| assert.Equal(t, 0, pod.WaitlistLen()) | |
| } | |
| func TestFunctionInfo(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| functions: map[string]*functionInfo{}, | |
| } | |
| t.Run("creates new entry for unknown image", func(t *testing.T) { | |
| fn := pcm.FunctionInfo("new-image") | |
| assert.NotNil(t, fn) | |
| assert.Empty(t, fn.pods) | |
| // Verify it was stored in the map | |
| stored, ok := pcm.functions["new-image"] | |
| assert.True(t, ok) | |
| assert.Equal(t, fn, stored) | |
| }) | |
| t.Run("returns existing entry for known image", func(t *testing.T) { | |
| existing := &functionInfo{pods: []functionPodInfo{makePodInfoWithLoad(1)}} | |
| pcm.functions["existing-image"] = existing | |
| fn := pcm.FunctionInfo("existing-image") | |
| assert.Equal(t, existing, fn) | |
| assert.Len(t, fn.pods, 1) | |
| }) | |
| } | |
| func TestForEachConcurrently(t *testing.T) { | |
| entries := []podCacheConfigEntry{ | |
| {Name: "fn-1"}, | |
| {Name: "fn-2"}, | |
| {Name: "fn-3"}, | |
| } | |
| var mu sync.Mutex | |
| visited := make(map[string]bool) | |
| forEachConcurrently(entries, func(entry podCacheConfigEntry) { | |
| mu.Lock() | |
| defer mu.Unlock() | |
| visited[entry.Name] = true | |
| }) | |
| assert.Len(t, visited, 3) | |
| assert.True(t, visited["fn-1"]) | |
| assert.True(t, visited["fn-2"]) | |
| assert.True(t, visited["fn-3"]) | |
| } | |
| func TestRemoveUnhealthyPods(t *testing.T) { | |
| const testNs = "test-ns" | |
| t.Run("nil function info is no-op", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| }, | |
| } | |
| // Should not panic | |
| pcm.removeUnhealthyPods(nil, false) | |
| }) | |
| t.Run("pod under creation (nil podData) is kept", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| }, | |
| } | |
| fn := &functionInfo{ | |
| pods: []functionPodInfo{ | |
| { | |
| podData: nil, // under creation | |
| concurrentEvaluations: &atomic.Int32{}, | |
| fnEvaluationMutex: &sync.Mutex{}, | |
| }, | |
| }, | |
| } | |
| pcm.removeUnhealthyPods(fn, false) | |
| assert.Len(t, fn.pods, 1, "pod under creation should be kept") | |
| }) | |
| t.Run("pod not found in k8s is removed", func(t *testing.T) { | |
| // Pod exists in cache but NOT in fake k8s client | |
| podKey := client.ObjectKey{Name: "gone-pod", Namespace: testNs} | |
| serviceKey := client.ObjectKey{Name: "gone-svc", Namespace: testNs} | |
| serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix | |
| address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) | |
| conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), // empty - no pods | |
| namespace: testNs, | |
| }, | |
| } | |
| fn := &functionInfo{ | |
| pods: []functionPodInfo{ | |
| makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0), | |
| }, | |
| } | |
| pcm.removeUnhealthyPods(fn, false) | |
| assert.Empty(t, fn.pods, "pod not found in k8s should be removed") | |
| }) | |
| t.Run("pod in Failed state is removed", func(t *testing.T) { | |
| podKey := client.ObjectKey{Name: "failed-pod", Namespace: testNs} | |
| serviceKey := client.ObjectKey{Name: "failed-svc", Namespace: testNs} | |
| serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix | |
| address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) | |
| conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "failed-pod", Namespace: testNs}, | |
| Status: corev1.PodStatus{Phase: corev1.PodFailed}, | |
| } | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "failed-svc", Namespace: testNs}, | |
| } | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(), | |
| namespace: testNs, | |
| }, | |
| } | |
| fn := &functionInfo{ | |
| pods: []functionPodInfo{ | |
| makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0), | |
| }, | |
| } | |
| pcm.removeUnhealthyPods(fn, false) | |
| assert.Empty(t, fn.pods, "pod in Failed state should be removed") | |
| }) | |
| t.Run("healthy pod is kept", func(t *testing.T) { | |
| podKey := client.ObjectKey{Name: "healthy-pod", Namespace: testNs} | |
| serviceKey := client.ObjectKey{Name: "healthy-svc", Namespace: testNs} | |
| serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix | |
| address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) | |
| conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "healthy-pod", Namespace: testNs}, | |
| Status: corev1.PodStatus{Phase: corev1.PodRunning}, | |
| } | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "healthy-svc", Namespace: testNs}, | |
| } | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(), | |
| namespace: testNs, | |
| }, | |
| } | |
| fn := &functionInfo{ | |
| pods: []functionPodInfo{ | |
| makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0), | |
| }, | |
| } | |
| pcm.removeUnhealthyPods(fn, false) | |
| assert.Len(t, fn.pods, 1, "healthy pod should be kept") | |
| }) | |
| t.Run("idle pod past TTL removed when removeIdle is true", func(t *testing.T) { | |
| podKey := client.ObjectKey{Name: "idle-pod", Namespace: testNs} | |
| serviceKey := client.ObjectKey{Name: "idle-svc", Namespace: testNs} | |
| serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix | |
| address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) | |
| conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "idle-pod", Namespace: testNs}, | |
| Status: corev1.PodStatus{Phase: corev1.PodRunning}, | |
| } | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "idle-svc", Namespace: testNs}, | |
| } | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(), | |
| namespace: testNs, | |
| }, | |
| } | |
| podInfo := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0) | |
| podInfo.lastActivity = time.Now().Add(-15 * time.Minute) // past TTL | |
| fn := &functionInfo{pods: []functionPodInfo{podInfo}} | |
| pcm.removeUnhealthyPods(fn, true) | |
| assert.Empty(t, fn.pods, "idle pod past TTL should be removed when removeIdle=true") | |
| }) | |
| t.Run("idle pod past TTL kept when removeIdle is false", func(t *testing.T) { | |
| podKey := client.ObjectKey{Name: "idle-pod2", Namespace: testNs} | |
| serviceKey := client.ObjectKey{Name: "idle-svc2", Namespace: testNs} | |
| serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix | |
| address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) | |
| conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "idle-pod2", Namespace: testNs}, | |
| Status: corev1.PodStatus{Phase: corev1.PodRunning}, | |
| } | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "idle-svc2", Namespace: testNs}, | |
| } | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(), | |
| namespace: testNs, | |
| }, | |
| } | |
| podInfo := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0) | |
| podInfo.lastActivity = time.Now().Add(-15 * time.Minute) // past TTL | |
| fn := &functionInfo{pods: []functionPodInfo{podInfo}} | |
| pcm.removeUnhealthyPods(fn, false) | |
| assert.Len(t, fn.pods, 1, "idle pod past TTL should be kept when removeIdle=false") | |
| }) | |
| } | |
| func TestGarbageCollectorUnit(t *testing.T) { | |
| const testNs = "test-ns" | |
| podKey := client.ObjectKey{Name: "gc-pod", Namespace: testNs} | |
| serviceKey := client.ObjectKey{Name: "gc-svc", Namespace: testNs} | |
| serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix | |
| address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) | |
| conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "gc-pod", Namespace: testNs}, | |
| Status: corev1.PodStatus{Phase: corev1.PodRunning}, | |
| } | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "gc-svc", Namespace: testNs}, | |
| } | |
| t.Run("removes empty function entries from map", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podTTL: 1 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(), | |
| namespace: testNs, | |
| }, | |
| functions: map[string]*functionInfo{ | |
| "expired-image": { | |
| pods: []functionPodInfo{ | |
| func() functionPodInfo { | |
| p := makeReadyPodInfo("expired-image", podKey, serviceKey, conn, 0) | |
| p.lastActivity = time.Now().Add(-5 * time.Minute) | |
| return p | |
| }(), | |
| }, | |
| }, | |
| }, | |
| } | |
| pcm.garbageCollector() | |
| // Allow background deletion goroutines to execute | |
| time.Sleep(100 * time.Millisecond) | |
| assert.Empty(t, pcm.functions, "expired function entry should be removed from map") | |
| }) | |
| } | |
| func TestRedistributeLoad(t *testing.T) { | |
| const testNs = "test-ns" | |
| t.Run("redistributes connections to available pods", func(t *testing.T) { | |
| podKey := client.ObjectKey{Name: "redist-pod", Namespace: testNs} | |
| serviceKey := client.ObjectKey{Name: "redist-svc", Namespace: testNs} | |
| serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix | |
| address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) | |
| conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "redist-pod", Namespace: testNs}, | |
| Status: corev1.PodStatus{Phase: corev1.PodRunning}, | |
| } | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "redist-svc", Namespace: testNs}, | |
| } | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(), | |
| namespace: testNs, | |
| }, | |
| functions: map[string]*functionInfo{}, | |
| } | |
| readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0) | |
| fn := &functionInfo{pods: []functionPodInfo{readyPod}} | |
| pcm.functions["test-image"] = fn | |
| // Create connection channels to redistribute | |
| ch1 := make(chan *connectionResponse, 1) | |
| ch2 := make(chan *connectionResponse, 1) | |
| result := pcm.redistributeLoad("test-image", fn, []chan<- *connectionResponse{ch1, ch2}) | |
| assert.True(t, result, "should redistribute successfully") | |
| // Both channels should receive responses | |
| resp1 := <-ch1 | |
| assert.NoError(t, resp1.err) | |
| resp2 := <-ch2 | |
| assert.NoError(t, resp2.err) | |
| }) | |
| t.Run("returns false when no pods available", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podTTL: 10 * time.Minute, | |
| configMap: map[string]podCacheConfigEntry{}, | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| namespace: testNs, | |
| }, | |
| functions: map[string]*functionInfo{}, | |
| } | |
| fn := &functionInfo{pods: []functionPodInfo{}} | |
| pcm.functions["empty-image"] = fn | |
| ch := make(chan *connectionResponse, 1) | |
| result := pcm.redistributeLoad("empty-image", fn, []chan<- *connectionResponse{ch}) | |
| assert.False(t, result, "should return false with no pods") | |
| }) | |
| } | |
| func TestDeletePodWithServiceInBackgroundByObjectKey(t *testing.T) { | |
| t.Run("deletes both pod and service", func(t *testing.T) { | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "pd-pod", Namespace: "test-ns"}, | |
| } | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "pd-svc", Namespace: "test-ns"}, | |
| } | |
| kubeClient := fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build() | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: kubeClient, | |
| }, | |
| } | |
| podKey := client.ObjectKeyFromObject(k8sPod) | |
| serviceKey := client.ObjectKeyFromObject(k8sSvc) | |
| pd := podData{ | |
| podKey: &podKey, | |
| serviceKey: &serviceKey, | |
| } | |
| pcm.DeletePodWithServiceInBackgroundByObjectKey(pd) | |
| time.Sleep(200 * time.Millisecond) | |
| var pod corev1.Pod | |
| err := kubeClient.Get(t.Context(), podKey, &pod) | |
| assert.Error(t, err, "pod should be deleted") | |
| var svc corev1.Service | |
| err = kubeClient.Get(t.Context(), serviceKey, &svc) | |
| assert.Error(t, err, "service should be deleted") | |
| }) | |
| t.Run("handles nil keys gracefully", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| }, | |
| } | |
| pd := podData{ | |
| podKey: nil, | |
| serviceKey: nil, | |
| } | |
| // Should not panic | |
| pcm.DeletePodWithServiceInBackgroundByObjectKey(pd) | |
| time.Sleep(50 * time.Millisecond) | |
| }) | |
| } | |
| func TestDeletePodAndWait(t *testing.T) { | |
| t.Run("deletes pod and waits for removal", func(t *testing.T) { | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "wait-pod", Namespace: "test-ns"}, | |
| } | |
| kubeClient := fake.NewClientBuilder().WithObjects(k8sPod).Build() | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: kubeClient, | |
| namespace: "test-ns", | |
| podReadyTimeout: 5 * time.Second, | |
| }, | |
| } | |
| err := pcm.deletePodAndWait(k8sPod) | |
| assert.NoError(t, err) | |
| var pod corev1.Pod | |
| getErr := kubeClient.Get(t.Context(), client.ObjectKeyFromObject(k8sPod), &pod) | |
| assert.Error(t, getErr, "pod should be deleted") | |
| }) | |
| } | |
| func TestEnsureCustomAuthSecret(t *testing.T) { | |
| dockerConfig := `{"auths":{"registry.example.com":{"auth":"dGVzdDp0ZXN0"}}}` | |
| tmpFile, err := os.CreateTemp("", "dockerconfig-*.json") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString(dockerConfig) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| kubeClient := fake.NewClientBuilder().Build() | |
| pm := &podManager{ | |
| kubeClient: kubeClient, | |
| namespace: "test-ns", | |
| } | |
| err = pm.ensureCustomAuthSecret(t.Context(), tmpFile.Name(), "test-secret") | |
| assert.NoError(t, err) | |
| // Verify secret was created | |
| var secret corev1.Secret | |
| err = kubeClient.Get(t.Context(), client.ObjectKey{Name: "test-secret", Namespace: "test-ns"}, &secret) | |
| assert.NoError(t, err) | |
| } | |
| func TestDeletePodInBackground(t *testing.T) { | |
| t.Run("nil pod does not panic", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| }, | |
| } | |
| // Should not panic | |
| pcm.DeletePodInBackground(nil) | |
| time.Sleep(50 * time.Millisecond) | |
| }) | |
| t.Run("pod with empty name is skipped", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| }, | |
| } | |
| pod := &corev1.Pod{} | |
| pcm.DeletePodInBackground(pod) | |
| time.Sleep(50 * time.Millisecond) | |
| }) | |
| t.Run("normal pod is deleted", func(t *testing.T) { | |
| k8sPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "to-delete", Namespace: "test-ns"}, | |
| } | |
| kubeClient := fake.NewClientBuilder().WithObjects(k8sPod).Build() | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: kubeClient, | |
| }, | |
| } | |
| pcm.DeletePodInBackground(k8sPod) | |
| time.Sleep(100 * time.Millisecond) | |
| var pod corev1.Pod | |
| err := kubeClient.Get(t.Context(), client.ObjectKeyFromObject(k8sPod), &pod) | |
| assert.Error(t, err, "pod should be deleted") | |
| }) | |
| } | |
| func TestDeleteServiceInBackground(t *testing.T) { | |
| t.Run("nil service does not panic", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| }, | |
| } | |
| pcm.DeleteServiceInBackground(nil) | |
| time.Sleep(50 * time.Millisecond) | |
| }) | |
| t.Run("service with empty name is skipped", func(t *testing.T) { | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: fake.NewClientBuilder().Build(), | |
| }, | |
| } | |
| svc := &corev1.Service{} | |
| pcm.DeleteServiceInBackground(svc) | |
| time.Sleep(50 * time.Millisecond) | |
| }) | |
| t.Run("normal service is deleted", func(t *testing.T) { | |
| k8sSvc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "to-delete-svc", Namespace: "test-ns"}, | |
| } | |
| kubeClient := fake.NewClientBuilder().WithObjects(k8sSvc).Build() | |
| pcm := &podCacheManager{ | |
| podManager: &podManager{ | |
| kubeClient: kubeClient, | |
| }, | |
| } | |
| pcm.DeleteServiceInBackground(k8sSvc) | |
| time.Sleep(100 * time.Millisecond) | |
| var svc corev1.Service | |
| err := kubeClient.Get(t.Context(), client.ObjectKeyFromObject(k8sSvc), &svc) | |
| assert.Error(t, err, "service should be deleted") | |
| }) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Copyright 2025 The Nephio Authors. | |
| // | |
| // Licensed under the Apache License, Version 2.0 (the "License"); | |
| // you may not use this file except in compliance with the License. | |
| // You may obtain a copy of the License at | |
| // | |
| // http://www.apache.org/licenses/LICENSE-2.0 | |
| // | |
| // Unless required by applicable law or agreed to in writing, software | |
| // distributed under the License is distributed on an "AS IS" BASIS, | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| // See the License for the specific language governing permissions and | |
| // limitations under the License. | |
| package internal | |
| import ( | |
| "crypto/ecdsa" | |
| "crypto/elliptic" | |
| "crypto/rand" | |
| "crypto/x509" | |
| "crypto/x509/pkix" | |
| "encoding/pem" | |
| "math/big" | |
| "net" | |
| "os" | |
| "path/filepath" | |
| "testing" | |
| "time" | |
| "github.com/google/go-containerregistry/pkg/name" | |
| "github.com/stretchr/testify/assert" | |
| "github.com/stretchr/testify/require" | |
| corev1 "k8s.io/api/core/v1" | |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
| "sigs.k8s.io/controller-runtime/pkg/client" | |
| "sigs.k8s.io/controller-runtime/pkg/client/fake" | |
| ) | |
| func TestPodID(t *testing.T) { | |
| tests := []struct { | |
| name string | |
| image string | |
| hash string | |
| postFix string | |
| expected string | |
| expectError bool | |
| }{ | |
| { | |
| name: "standard image with tag", | |
| image: "gcr.io/kpt-fn/apply-replacements:latest", | |
| hash: "5245a52778d684fa698f69861fb2e058b308f6a74fed5bf2fe77d97bad5e071c", | |
| postFix: "1", | |
| expected: "apply-replacements-latest-1-5245a527", | |
| }, | |
| { | |
| name: "image without explicit tag defaults to latest", | |
| image: "gcr.io/kpt-fn/set-namespace", | |
| hash: "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890", | |
| postFix: "2", | |
| expected: "set-namespace-latest-2-abcdef12", | |
| }, | |
| { | |
| name: "image with version tag containing dots", | |
| image: "gcr.io/kpt-fn/set-labels:v0.4.1", | |
| hash: "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", | |
| postFix: "1", | |
| expected: "set-labels-v041-1-12345678", | |
| }, | |
| { | |
| name: "image with underscores in name", | |
| image: "gcr.io/kpt-fn/my_function:latest", | |
| hash: "aabbccddee112233aabbccddee112233aabbccddee112233aabbccddee112233", | |
| postFix: "1", | |
| expected: "my-function-latest-1-aabbccdd", | |
| }, | |
| { | |
| name: "invalid image reference", | |
| image: "invalid@oci@ref", | |
| hash: "deadbeef", | |
| postFix: "1", | |
| expectError: true, | |
| }, | |
| } | |
| for _, tt := range tests { | |
| t.Run(tt.name, func(t *testing.T) { | |
| result, err := podID(tt.image, tt.hash, tt.postFix) | |
| if tt.expectError { | |
| assert.Error(t, err) | |
| } else { | |
| require.NoError(t, err) | |
| assert.Equal(t, tt.expected, result) | |
| } | |
| }) | |
| } | |
| } | |
| func TestIsPodTemplateSameVersion(t *testing.T) { | |
| tests := []struct { | |
| name string | |
| pod *corev1.Pod | |
| templateVersion string | |
| expected bool | |
| }{ | |
| { | |
| name: "matching version", | |
| pod: &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{ | |
| Annotations: map[string]string{ | |
| templateVersionAnnotation: "v1", | |
| }, | |
| }, | |
| }, | |
| templateVersion: "v1", | |
| expected: true, | |
| }, | |
| { | |
| name: "mismatching version", | |
| pod: &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{ | |
| Annotations: map[string]string{ | |
| templateVersionAnnotation: "v1", | |
| }, | |
| }, | |
| }, | |
| templateVersion: "v2", | |
| expected: false, | |
| }, | |
| { | |
| name: "no annotation", | |
| pod: &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{ | |
| Annotations: map[string]string{}, | |
| }, | |
| }, | |
| templateVersion: "v1", | |
| expected: false, | |
| }, | |
| { | |
| name: "nil annotations", | |
| pod: &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{}, | |
| }, | |
| templateVersion: "v1", | |
| expected: false, | |
| }, | |
| } | |
| for _, tt := range tests { | |
| t.Run(tt.name, func(t *testing.T) { | |
| result := isPodTemplateSameVersion(tt.pod, tt.templateVersion) | |
| assert.Equal(t, tt.expected, result) | |
| }) | |
| } | |
| } | |
| func TestHasImagePullSecret(t *testing.T) { | |
| tests := []struct { | |
| name string | |
| pod *corev1.Pod | |
| secretName string | |
| expected bool | |
| }{ | |
| { | |
| name: "secret found", | |
| pod: &corev1.Pod{ | |
| Spec: corev1.PodSpec{ | |
| ImagePullSecrets: []corev1.LocalObjectReference{ | |
| {Name: "my-secret"}, | |
| {Name: "other-secret"}, | |
| }, | |
| }, | |
| }, | |
| secretName: "my-secret", | |
| expected: true, | |
| }, | |
| { | |
| name: "secret not found", | |
| pod: &corev1.Pod{ | |
| Spec: corev1.PodSpec{ | |
| ImagePullSecrets: []corev1.LocalObjectReference{ | |
| {Name: "other-secret"}, | |
| }, | |
| }, | |
| }, | |
| secretName: "my-secret", | |
| expected: false, | |
| }, | |
| { | |
| name: "empty secrets list", | |
| pod: &corev1.Pod{ | |
| Spec: corev1.PodSpec{ | |
| ImagePullSecrets: []corev1.LocalObjectReference{}, | |
| }, | |
| }, | |
| secretName: "my-secret", | |
| expected: false, | |
| }, | |
| { | |
| name: "nil secrets list", | |
| pod: &corev1.Pod{ | |
| Spec: corev1.PodSpec{}, | |
| }, | |
| secretName: "my-secret", | |
| expected: false, | |
| }, | |
| } | |
| for _, tt := range tests { | |
| t.Run(tt.name, func(t *testing.T) { | |
| result := hasImagePullSecret(tt.pod, tt.secretName) | |
| assert.Equal(t, tt.expected, result) | |
| }) | |
| } | |
| } | |
| func TestPatchNewPodContainer(t *testing.T) { | |
| pm := &podManager{maxGrpcMessageSize: 4 * 1024 * 1024} | |
| t.Run("patches function container successfully", func(t *testing.T) { | |
| pod := &corev1.Pod{ | |
| Spec: corev1.PodSpec{ | |
| Containers: []corev1.Container{ | |
| { | |
| Name: functionContainerName, | |
| Image: "to-be-replaced", | |
| Command: []string{filepath.Join(volumeMountPath, wrapperServerBin)}, | |
| }, | |
| }, | |
| }, | |
| } | |
| de := digestAndEntrypoint{ | |
| digest: "abc123", | |
| entrypoint: []string{"/my-function"}, | |
| } | |
| err := pm.patchNewPodContainer(pod, de, "gcr.io/kpt-fn/my-function:latest") | |
| require.NoError(t, err) | |
| container := pod.Spec.Containers[0] | |
| assert.Equal(t, "gcr.io/kpt-fn/my-function:latest", container.Image) | |
| assert.Contains(t, container.Args, "--port") | |
| assert.Contains(t, container.Args, defaultWrapperServerPort) | |
| assert.Contains(t, container.Args, "--") | |
| assert.Contains(t, container.Args, "/my-function") | |
| }) | |
| t.Run("returns error when function container not found", func(t *testing.T) { | |
| pod := &corev1.Pod{ | |
| Spec: corev1.PodSpec{ | |
| Containers: []corev1.Container{ | |
| { | |
| Name: "not-a-function", | |
| Image: "some-image", | |
| }, | |
| }, | |
| }, | |
| } | |
| de := digestAndEntrypoint{ | |
| digest: "abc123", | |
| entrypoint: []string{"/entry"}, | |
| } | |
| err := pm.patchNewPodContainer(pod, de, "test-image") | |
| assert.Error(t, err) | |
| assert.Contains(t, err.Error(), functionContainerName) | |
| }) | |
| t.Run("handles multiple containers", func(t *testing.T) { | |
| pod := &corev1.Pod{ | |
| Spec: corev1.PodSpec{ | |
| Containers: []corev1.Container{ | |
| {Name: "sidecar", Image: "sidecar-image"}, | |
| { | |
| Name: functionContainerName, | |
| Image: "to-be-replaced", | |
| Command: []string{filepath.Join(volumeMountPath, wrapperServerBin)}, | |
| }, | |
| }, | |
| }, | |
| } | |
| de := digestAndEntrypoint{ | |
| digest: "abc123", | |
| entrypoint: []string{"/fn"}, | |
| } | |
| err := pm.patchNewPodContainer(pod, de, "my-image") | |
| require.NoError(t, err) | |
| assert.Equal(t, "sidecar-image", pod.Spec.Containers[0].Image, "sidecar should be unchanged") | |
| assert.Equal(t, "my-image", pod.Spec.Containers[1].Image, "function container should be patched") | |
| }) | |
| } | |
| func TestPatchNewPodMetadata(t *testing.T) { | |
| pm := &podManager{namespace: "test-ns"} | |
| t.Run("sets namespace labels and annotations from scratch", func(t *testing.T) { | |
| pod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{}, | |
| } | |
| pm.patchNewPodMetadata(pod, "my-pod-id", "v2") | |
| assert.Equal(t, "test-ns", pod.Namespace) | |
| assert.Equal(t, "v2", pod.Annotations[templateVersionAnnotation]) | |
| assert.Equal(t, "my-pod-id", pod.Labels[krmFunctionImageLabel]) | |
| }) | |
| t.Run("preserves existing annotations and labels", func(t *testing.T) { | |
| pod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{ | |
| Annotations: map[string]string{"existing-key": "existing-val"}, | |
| Labels: map[string]string{"existing-label": "label-val"}, | |
| }, | |
| } | |
| pm.patchNewPodMetadata(pod, "pod-id", "v1") | |
| assert.Equal(t, "existing-val", pod.Annotations["existing-key"], "existing annotations preserved") | |
| assert.Equal(t, "v1", pod.Annotations[templateVersionAnnotation], "template version set") | |
| assert.Equal(t, "label-val", pod.Labels["existing-label"], "existing labels preserved") | |
| assert.Equal(t, "pod-id", pod.Labels[krmFunctionImageLabel], "function image label set") | |
| }) | |
| } | |
| func TestAppendImagePullSecret(t *testing.T) { | |
| t.Run("appends secret for private registry image", func(t *testing.T) { | |
| pm := &podManager{ | |
| enablePrivateRegistries: true, | |
| registryAuthSecretName: "my-auth-secret", | |
| } | |
| pod := &corev1.Pod{ | |
| Spec: corev1.PodSpec{}, | |
| } | |
| pm.appendImagePullSecret("private.registry.io/my-fn:latest", pod) | |
| require.Len(t, pod.Spec.ImagePullSecrets, 1) | |
| assert.Equal(t, "my-auth-secret", pod.Spec.ImagePullSecrets[0].Name) | |
| }) | |
| t.Run("does not append if secret already exists", func(t *testing.T) { | |
| pm := &podManager{ | |
| enablePrivateRegistries: true, | |
| registryAuthSecretName: "my-auth-secret", | |
| } | |
| pod := &corev1.Pod{ | |
| Spec: corev1.PodSpec{ | |
| ImagePullSecrets: []corev1.LocalObjectReference{ | |
| {Name: "my-auth-secret"}, | |
| }, | |
| }, | |
| } | |
| pm.appendImagePullSecret("private.registry.io/my-fn:latest", pod) | |
| assert.Len(t, pod.Spec.ImagePullSecrets, 1, "should not duplicate") | |
| }) | |
| t.Run("does not append for default registry", func(t *testing.T) { | |
| pm := &podManager{ | |
| enablePrivateRegistries: true, | |
| registryAuthSecretName: "my-auth-secret", | |
| } | |
| pod := &corev1.Pod{ | |
| Spec: corev1.PodSpec{}, | |
| } | |
| pm.appendImagePullSecret(defaultRegistry+"my-fn:latest", pod) | |
| assert.Empty(t, pod.Spec.ImagePullSecrets) | |
| }) | |
| t.Run("does not append when private registries disabled", func(t *testing.T) { | |
| pm := &podManager{ | |
| enablePrivateRegistries: false, | |
| registryAuthSecretName: "my-auth-secret", | |
| } | |
| pod := &corev1.Pod{ | |
| Spec: corev1.PodSpec{}, | |
| } | |
| pm.appendImagePullSecret("private.registry.io/my-fn:latest", pod) | |
| assert.Empty(t, pod.Spec.ImagePullSecrets) | |
| }) | |
| } | |
| func TestLoadTLSConfig(t *testing.T) { | |
| t.Run("valid PEM certificate", func(t *testing.T) { | |
| // Generate a self-signed certificate for testing | |
| certPEM := generateSelfSignedCertPEM(t) | |
| tmpFile, err := os.CreateTemp("", "ca-cert-*.pem") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.Write(certPEM) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| tlsConfig, err := loadTLSConfig(tmpFile.Name()) | |
| require.NoError(t, err) | |
| assert.NotNil(t, tlsConfig) | |
| assert.NotNil(t, tlsConfig.RootCAs) | |
| }) | |
| t.Run("invalid PEM data", func(t *testing.T) { | |
| tmpFile, err := os.CreateTemp("", "ca-cert-invalid-*.pem") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString("not a valid PEM certificate") | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| _, err = loadTLSConfig(tmpFile.Name()) | |
| assert.Error(t, err) | |
| assert.Contains(t, err.Error(), "failed to append certificates") | |
| }) | |
| t.Run("missing file", func(t *testing.T) { | |
| _, err := loadTLSConfig("/nonexistent/ca.pem") | |
| assert.Error(t, err) | |
| }) | |
| } | |
| func TestCreateTransport(t *testing.T) { | |
| certPEM := generateSelfSignedCertPEM(t) | |
| tmpFile, err := os.CreateTemp("", "transport-cert-*.pem") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.Write(certPEM) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| tlsConfig, err := loadTLSConfig(tmpFile.Name()) | |
| require.NoError(t, err) | |
| transport := createTransport(tlsConfig) | |
| assert.NotNil(t, transport) | |
| assert.Equal(t, tlsConfig, transport.TLSClientConfig) | |
| } | |
| func TestFindPodsForService(t *testing.T) { | |
| t.Run("returns matching pods", func(t *testing.T) { | |
| svc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "my-svc", Namespace: "test-ns"}, | |
| Spec: corev1.ServiceSpec{ | |
| Selector: map[string]string{"app": "my-fn"}, | |
| }, | |
| } | |
| matchingPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{ | |
| Name: "matching-pod", | |
| Namespace: "test-ns", | |
| Labels: map[string]string{"app": "my-fn"}, | |
| }, | |
| } | |
| nonMatchingPod := &corev1.Pod{ | |
| ObjectMeta: metav1.ObjectMeta{ | |
| Name: "other-pod", | |
| Namespace: "test-ns", | |
| Labels: map[string]string{"app": "other"}, | |
| }, | |
| } | |
| kubeClient := fake.NewClientBuilder().WithObjects(matchingPod, nonMatchingPod).Build() | |
| pm := &podManager{kubeClient: kubeClient} | |
| pods, err := pm.findPodsForService(t.Context(), svc) | |
| require.NoError(t, err) | |
| assert.Len(t, pods, 1) | |
| assert.Equal(t, "matching-pod", pods[0].Name) | |
| }) | |
| t.Run("returns empty when no pods match", func(t *testing.T) { | |
| svc := &corev1.Service{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "my-svc", Namespace: "test-ns"}, | |
| Spec: corev1.ServiceSpec{ | |
| Selector: map[string]string{"app": "no-match"}, | |
| }, | |
| } | |
| kubeClient := fake.NewClientBuilder().Build() | |
| pm := &podManager{kubeClient: kubeClient} | |
| pods, err := pm.findPodsForService(t.Context(), svc) | |
| require.NoError(t, err) | |
| assert.Empty(t, pods) | |
| }) | |
| } | |
| func TestInspectOrCreateSecret(t *testing.T) { | |
| t.Run("creates secret when not exists", func(t *testing.T) { | |
| // Write a temporary docker config file | |
| tmpFile, err := os.CreateTemp("", "dockerconfig-*.json") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| dockerConfig := `{"auths":{"registry.example.com":{"auth":"dGVzdDp0ZXN0"}}}` | |
| _, err = tmpFile.WriteString(dockerConfig) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| kubeClient := fake.NewClientBuilder().Build() | |
| pm := &podManager{ | |
| kubeClient: kubeClient, | |
| namespace: "test-ns", | |
| } | |
| err = pm.InspectOrCreateSecret(t.Context(), tmpFile.Name(), "test-secret") | |
| require.NoError(t, err) | |
| // Verify secret was created | |
| var secret corev1.Secret | |
| err = kubeClient.Get(t.Context(), client.ObjectKey{Name: "test-secret", Namespace: "test-ns"}, &secret) | |
| require.NoError(t, err) | |
| assert.Equal(t, corev1.SecretTypeDockerConfigJson, secret.Type) | |
| assert.Equal(t, dockerConfig, string(secret.Data[".dockerconfigjson"])) | |
| }) | |
| t.Run("does not recreate when secret matches", func(t *testing.T) { | |
| dockerConfig := `{"auths":{"registry.example.com":{"auth":"dGVzdDp0ZXN0"}}}` | |
| tmpFile, err := os.CreateTemp("", "dockerconfig-*.json") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString(dockerConfig) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| existingSecret := &corev1.Secret{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "test-secret", Namespace: "test-ns"}, | |
| Data: map[string][]byte{".dockerconfigjson": []byte(dockerConfig)}, | |
| Type: corev1.SecretTypeDockerConfigJson, | |
| } | |
| kubeClient := fake.NewClientBuilder().WithObjects(existingSecret).Build() | |
| pm := &podManager{ | |
| kubeClient: kubeClient, | |
| namespace: "test-ns", | |
| } | |
| err = pm.InspectOrCreateSecret(t.Context(), tmpFile.Name(), "test-secret") | |
| require.NoError(t, err) | |
| }) | |
| t.Run("updates secret when content differs", func(t *testing.T) { | |
| newDockerConfig := `{"auths":{"new-registry.example.com":{"auth":"bmV3OnRlc3Q="}}}` | |
| tmpFile, err := os.CreateTemp("", "dockerconfig-*.json") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString(newDockerConfig) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| existingSecret := &corev1.Secret{ | |
| ObjectMeta: metav1.ObjectMeta{Name: "test-secret", Namespace: "test-ns"}, | |
| Data: map[string][]byte{".dockerconfigjson": []byte(`{"auths":{"old-registry.example.com":{}}}`)}, | |
| Type: corev1.SecretTypeDockerConfigJson, | |
| } | |
| kubeClient := fake.NewClientBuilder().WithObjects(existingSecret).Build() | |
| pm := &podManager{ | |
| kubeClient: kubeClient, | |
| namespace: "test-ns", | |
| } | |
| err = pm.InspectOrCreateSecret(t.Context(), tmpFile.Name(), "test-secret") | |
| require.NoError(t, err) | |
| // Verify secret was updated | |
| var secret corev1.Secret | |
| err = kubeClient.Get(t.Context(), client.ObjectKey{Name: "test-secret", Namespace: "test-ns"}, &secret) | |
| require.NoError(t, err) | |
| assert.Equal(t, newDockerConfig, string(secret.Data[".dockerconfigjson"])) | |
| }) | |
| t.Run("returns error for missing auth file", func(t *testing.T) { | |
| kubeClient := fake.NewClientBuilder().Build() | |
| pm := &podManager{ | |
| kubeClient: kubeClient, | |
| namespace: "test-ns", | |
| } | |
| err := pm.InspectOrCreateSecret(t.Context(), "/nonexistent/path", "test-secret") | |
| assert.Error(t, err) | |
| }) | |
| } | |
| func TestGetCustomAuth(t *testing.T) { | |
| t.Run("parses docker config and returns authenticator", func(t *testing.T) { | |
| dockerConfig := `{"auths":{"gcr.io":{"username":"_json_key","password":"secret","auth":"X2pzb25fa2V5OnNlY3JldA=="}}}` | |
| tmpFile, err := os.CreateTemp("", "dockerconfig-*.json") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString(dockerConfig) | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| pm := &podManager{} | |
| // Use a reference that resolves to gcr.io | |
| ref, err := parseTestReference("gcr.io/my-project/my-image:latest") | |
| require.NoError(t, err) | |
| auth, err := pm.getCustomAuth(ref, tmpFile.Name()) | |
| require.NoError(t, err) | |
| assert.NotNil(t, auth) | |
| }) | |
| t.Run("returns error for missing file", func(t *testing.T) { | |
| pm := &podManager{} | |
| ref, err := parseTestReference("gcr.io/my-project/my-image:latest") | |
| require.NoError(t, err) | |
| _, err = pm.getCustomAuth(ref, "/nonexistent/path") | |
| assert.Error(t, err) | |
| }) | |
| t.Run("returns error for invalid JSON", func(t *testing.T) { | |
| tmpFile, err := os.CreateTemp("", "dockerconfig-*.json") | |
| require.NoError(t, err) | |
| defer os.Remove(tmpFile.Name()) | |
| _, err = tmpFile.WriteString("{invalid json") | |
| require.NoError(t, err) | |
| require.NoError(t, tmpFile.Close()) | |
| pm := &podManager{} | |
| ref, err := parseTestReference("gcr.io/my-project/my-image:latest") | |
| require.NoError(t, err) | |
| _, err = pm.getCustomAuth(ref, tmpFile.Name()) | |
| assert.Error(t, err) | |
| }) | |
| } | |
| // generateSelfSignedCertPEM generates a self-signed certificate PEM for testing. | |
| func generateSelfSignedCertPEM(t *testing.T) []byte { | |
| t.Helper() | |
| key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) | |
| require.NoError(t, err) | |
| template := x509.Certificate{ | |
| SerialNumber: big.NewInt(1), | |
| Subject: pkix.Name{Organization: []string{"Test"}}, | |
| NotBefore: time.Now(), | |
| NotAfter: time.Now().Add(time.Hour), | |
| KeyUsage: x509.KeyUsageCertSign, | |
| IsCA: true, | |
| IPAddresses: []net.IP{net.ParseIP("127.0.0.1")}, | |
| } | |
| certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) | |
| require.NoError(t, err) | |
| return pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) | |
| } | |
| // parseTestReference is a helper to parse an image reference for testing. | |
| func parseTestReference(image string) (name.Reference, error) { | |
| return name.ParseReference(image) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment