Skip to content

Instantly share code, notes, and snippets.

@thc1006
Created February 18, 2026 17:10
Show Gist options
  • Select an option

  • Save thc1006/daa56f0b5267094578797afbcf194a48 to your computer and use it in GitHub Desktop.

Select an option

Save thc1006/daa56f0b5267094578797afbcf194a48 to your computer and use it in GitHub Desktop.
Unit tests for PR #336 (parallel KRM function evaluator pods) — podcachemanager.go & podmanager.go coverage
// Copyright 2025 The Nephio Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"fmt"
"net"
"os"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
// makePodInfoWithLoad creates a functionPodInfo with the specified concurrent evaluation count.
func makePodInfoWithLoad(load int32) functionPodInfo {
counter := &atomic.Int32{}
counter.Store(load)
return functionPodInfo{
concurrentEvaluations: counter,
fnEvaluationMutex: &sync.Mutex{},
lastActivity: time.Now(),
waitlist: []chan<- *connectionResponse{},
}
}
// makeReadyPodInfo creates a functionPodInfo with podData, for testing functions that require a ready pod.
func makeReadyPodInfo(image string, podKey, serviceKey client.ObjectKey, grpcConn *grpc.ClientConn, load int32) functionPodInfo {
counter := &atomic.Int32{}
counter.Store(load)
return functionPodInfo{
podData: &podData{
image: image,
podKey: &podKey,
serviceKey: &serviceKey,
grpcConnection: grpcConn,
},
concurrentEvaluations: counter,
fnEvaluationMutex: &sync.Mutex{},
lastActivity: time.Now(),
waitlist: []chan<- *connectionResponse{},
}
}
func TestFindBestPod(t *testing.T) {
pcm := &podCacheManager{}
tests := []struct {
name string
fn *functionInfo
expectedIdx int
expectedWaitlist int
}{
{
name: "nil function info returns -1",
fn: nil,
expectedIdx: -1,
expectedWaitlist: 0,
},
{
name: "empty pods returns -1",
fn: &functionInfo{pods: []functionPodInfo{}},
expectedIdx: -1,
expectedWaitlist: 0,
},
{
name: "single pod with no load",
fn: &functionInfo{pods: []functionPodInfo{
makePodInfoWithLoad(0),
}},
expectedIdx: 0,
expectedWaitlist: 0,
},
{
name: "single pod with load",
fn: &functionInfo{pods: []functionPodInfo{
makePodInfoWithLoad(5),
}},
expectedIdx: 0,
expectedWaitlist: 5,
},
{
name: "multiple pods selects least loaded",
fn: &functionInfo{pods: []functionPodInfo{
makePodInfoWithLoad(5),
makePodInfoWithLoad(1),
makePodInfoWithLoad(8),
}},
expectedIdx: 1,
expectedWaitlist: 1,
},
{
name: "multiple pods with equal load selects first",
fn: &functionInfo{pods: []functionPodInfo{
makePodInfoWithLoad(3),
makePodInfoWithLoad(3),
makePodInfoWithLoad(3),
}},
expectedIdx: 0,
expectedWaitlist: 3,
},
{
name: "last pod is least loaded",
fn: &functionInfo{pods: []functionPodInfo{
makePodInfoWithLoad(10),
makePodInfoWithLoad(7),
makePodInfoWithLoad(2),
}},
expectedIdx: 2,
expectedWaitlist: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
idx, waitlist := pcm.findBestPod(tt.fn)
assert.Equal(t, tt.expectedIdx, idx)
assert.Equal(t, tt.expectedWaitlist, waitlist)
})
}
}
func TestGetParamsForImage(t *testing.T) {
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
maxWaitlistLength: 2,
maxParallelPodsPerFunction: 3,
configMap: map[string]podCacheConfigEntry{
"full-override": {
Name: "full-override",
TimeToLive: "5m",
MaxWaitlistLength: 10,
MaxParallelPodsPerFunction: 5,
},
"partial-override": {
Name: "partial-override",
TimeToLive: "3m",
MaxWaitlistLength: 0, // zero -> falls back to default
},
"invalid-ttl": {
Name: "invalid-ttl",
TimeToLive: "not-a-duration",
MaxWaitlistLength: 4,
MaxParallelPodsPerFunction: 2,
},
"zero-ttl": {
Name: "zero-ttl",
TimeToLive: "0s",
MaxWaitlistLength: 1,
MaxParallelPodsPerFunction: 1,
},
},
}
tests := []struct {
name string
image string
expectedTTL time.Duration
expectedWaitlist int
expectedMaxPods int
}{
{
name: "full override from configMap",
image: "full-override",
expectedTTL: 5 * time.Minute,
expectedWaitlist: 10,
expectedMaxPods: 5,
},
{
name: "partial override falls back to defaults for zero values",
image: "partial-override",
expectedTTL: 3 * time.Minute,
expectedWaitlist: 2, // default
expectedMaxPods: 3, // default
},
{
name: "invalid TTL falls back to default TTL",
image: "invalid-ttl",
expectedTTL: 10 * time.Minute, // default
expectedWaitlist: 4,
expectedMaxPods: 2,
},
{
name: "zero TTL falls back to default TTL",
image: "zero-ttl",
expectedTTL: 10 * time.Minute, // default
expectedWaitlist: 1,
expectedMaxPods: 1,
},
{
name: "image not in configMap uses all defaults",
image: "unknown-image",
expectedTTL: 10 * time.Minute,
expectedWaitlist: 2,
expectedMaxPods: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ttl, maxWaitlist, maxPods := pcm.getParamsForImage(tt.image)
assert.Equal(t, tt.expectedTTL, ttl)
assert.Equal(t, tt.expectedWaitlist, maxWaitlist)
assert.Equal(t, tt.expectedMaxPods, maxPods)
})
}
}
func TestLoadPodCacheConfig(t *testing.T) {
t.Run("valid config file", func(t *testing.T) {
content := `
- name: "gcr.io/kpt-fn/apply-replacements"
timeToLive: "30m"
maxWaitlistLength: 5
maxParallelPodsPerFunction: 3
- name: "gcr.io/kpt-fn/set-namespace"
timeToLive: "10m"
maxWaitlistLength: 2
maxParallelPodsPerFunction: 1
`
tmpFile, err := os.CreateTemp("", "pod-cache-config-*.yaml")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString(content)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
configMap, err := loadPodCacheConfig(tmpFile.Name())
require.NoError(t, err)
assert.Len(t, configMap, 2)
entry, ok := configMap["gcr.io/kpt-fn/apply-replacements"]
assert.True(t, ok)
assert.Equal(t, "30m", entry.TimeToLive)
assert.Equal(t, 5, entry.MaxWaitlistLength)
assert.Equal(t, 3, entry.MaxParallelPodsPerFunction)
entry2, ok := configMap["gcr.io/kpt-fn/set-namespace"]
assert.True(t, ok)
assert.Equal(t, "10m", entry2.TimeToLive)
})
t.Run("empty config file", func(t *testing.T) {
tmpFile, err := os.CreateTemp("", "pod-cache-config-empty-*.yaml")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString("[]")
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
configMap, err := loadPodCacheConfig(tmpFile.Name())
require.NoError(t, err)
assert.Empty(t, configMap)
})
t.Run("invalid YAML", func(t *testing.T) {
tmpFile, err := os.CreateTemp("", "pod-cache-config-invalid-*.yaml")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString("{{invalid yaml")
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
_, err = loadPodCacheConfig(tmpFile.Name())
assert.Error(t, err)
})
t.Run("missing file", func(t *testing.T) {
_, err := loadPodCacheConfig("/nonexistent/path/config.yaml")
assert.Error(t, err)
})
}
func TestNewPodInfo(t *testing.T) {
t.Run("nil channel creates pod with empty waitlist", func(t *testing.T) {
pod := NewPodInfo(nil)
assert.Nil(t, pod.podData)
assert.Empty(t, pod.waitlist)
assert.Equal(t, int32(0), pod.concurrentEvaluations.Load())
assert.NotNil(t, pod.fnEvaluationMutex)
})
t.Run("non-nil channel adds to waitlist and increments counter", func(t *testing.T) {
ch := make(chan *connectionResponse, 1)
pod := NewPodInfo(ch)
assert.Nil(t, pod.podData)
assert.Len(t, pod.waitlist, 1)
assert.Equal(t, int32(1), pod.concurrentEvaluations.Load())
})
}
func TestSendResponse(t *testing.T) {
t.Run("sends error when err is not nil", func(t *testing.T) {
pod := &functionPodInfo{
podData: &podData{image: "test"},
concurrentEvaluations: &atomic.Int32{},
fnEvaluationMutex: &sync.Mutex{},
}
ch := make(chan *connectionResponse, 1)
testErr := fmt.Errorf("test error")
pod.SendResponse(ch, testErr)
resp := <-ch
assert.Error(t, resp.err)
assert.Equal(t, "test error", resp.err.Error())
})
t.Run("sends error when podData is nil", func(t *testing.T) {
pod := &functionPodInfo{
podData: nil,
concurrentEvaluations: &atomic.Int32{},
fnEvaluationMutex: &sync.Mutex{},
}
ch := make(chan *connectionResponse, 1)
pod.SendResponse(ch, nil)
resp := <-ch
assert.Error(t, resp.err)
assert.Contains(t, resp.err.Error(), "pod is not ready")
})
t.Run("sends success with podData", func(t *testing.T) {
conn, err := grpc.NewClient("localhost:9446", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
podKey := client.ObjectKey{Name: "test-pod", Namespace: "test-ns"}
serviceKey := client.ObjectKey{Name: "test-svc", Namespace: "test-ns"}
pod := &functionPodInfo{
podData: &podData{
image: "test-image",
grpcConnection: conn,
podKey: &podKey,
serviceKey: &serviceKey,
},
concurrentEvaluations: &atomic.Int32{},
fnEvaluationMutex: &sync.Mutex{},
}
ch := make(chan *connectionResponse, 1)
pod.SendResponse(ch, nil)
resp := <-ch
assert.NoError(t, resp.err)
assert.Equal(t, "test-image", resp.podData.image)
assert.NotNil(t, resp.grpcConnection)
assert.NotNil(t, resp.fnEvaluationMutex)
assert.NotNil(t, resp.concurrentEvaluations)
})
}
func TestWaitlistLen(t *testing.T) {
counter := &atomic.Int32{}
counter.Store(7)
pod := functionPodInfo{
concurrentEvaluations: counter,
}
assert.Equal(t, 7, pod.WaitlistLen())
counter.Store(0)
assert.Equal(t, 0, pod.WaitlistLen())
}
func TestFunctionInfo(t *testing.T) {
pcm := &podCacheManager{
functions: map[string]*functionInfo{},
}
t.Run("creates new entry for unknown image", func(t *testing.T) {
fn := pcm.FunctionInfo("new-image")
assert.NotNil(t, fn)
assert.Empty(t, fn.pods)
// Verify it was stored in the map
stored, ok := pcm.functions["new-image"]
assert.True(t, ok)
assert.Equal(t, fn, stored)
})
t.Run("returns existing entry for known image", func(t *testing.T) {
existing := &functionInfo{pods: []functionPodInfo{makePodInfoWithLoad(1)}}
pcm.functions["existing-image"] = existing
fn := pcm.FunctionInfo("existing-image")
assert.Equal(t, existing, fn)
assert.Len(t, fn.pods, 1)
})
}
func TestForEachConcurrently(t *testing.T) {
entries := []podCacheConfigEntry{
{Name: "fn-1"},
{Name: "fn-2"},
{Name: "fn-3"},
}
var mu sync.Mutex
visited := make(map[string]bool)
forEachConcurrently(entries, func(entry podCacheConfigEntry) {
mu.Lock()
defer mu.Unlock()
visited[entry.Name] = true
})
assert.Len(t, visited, 3)
assert.True(t, visited["fn-1"])
assert.True(t, visited["fn-2"])
assert.True(t, visited["fn-3"])
}
func TestRemoveUnhealthyPods(t *testing.T) {
const testNs = "test-ns"
t.Run("nil function info is no-op", func(t *testing.T) {
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
},
}
// Should not panic
pcm.removeUnhealthyPods(nil, false)
})
t.Run("pod under creation (nil podData) is kept", func(t *testing.T) {
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
},
}
fn := &functionInfo{
pods: []functionPodInfo{
{
podData: nil, // under creation
concurrentEvaluations: &atomic.Int32{},
fnEvaluationMutex: &sync.Mutex{},
},
},
}
pcm.removeUnhealthyPods(fn, false)
assert.Len(t, fn.pods, 1, "pod under creation should be kept")
})
t.Run("pod not found in k8s is removed", func(t *testing.T) {
// Pod exists in cache but NOT in fake k8s client
podKey := client.ObjectKey{Name: "gone-pod", Namespace: testNs}
serviceKey := client.ObjectKey{Name: "gone-svc", Namespace: testNs}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(), // empty - no pods
namespace: testNs,
},
}
fn := &functionInfo{
pods: []functionPodInfo{
makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0),
},
}
pcm.removeUnhealthyPods(fn, false)
assert.Empty(t, fn.pods, "pod not found in k8s should be removed")
})
t.Run("pod in Failed state is removed", func(t *testing.T) {
podKey := client.ObjectKey{Name: "failed-pod", Namespace: testNs}
serviceKey := client.ObjectKey{Name: "failed-svc", Namespace: testNs}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "failed-pod", Namespace: testNs},
Status: corev1.PodStatus{Phase: corev1.PodFailed},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "failed-svc", Namespace: testNs},
}
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(),
namespace: testNs,
},
}
fn := &functionInfo{
pods: []functionPodInfo{
makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0),
},
}
pcm.removeUnhealthyPods(fn, false)
assert.Empty(t, fn.pods, "pod in Failed state should be removed")
})
t.Run("healthy pod is kept", func(t *testing.T) {
podKey := client.ObjectKey{Name: "healthy-pod", Namespace: testNs}
serviceKey := client.ObjectKey{Name: "healthy-svc", Namespace: testNs}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "healthy-pod", Namespace: testNs},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "healthy-svc", Namespace: testNs},
}
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(),
namespace: testNs,
},
}
fn := &functionInfo{
pods: []functionPodInfo{
makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0),
},
}
pcm.removeUnhealthyPods(fn, false)
assert.Len(t, fn.pods, 1, "healthy pod should be kept")
})
t.Run("idle pod past TTL removed when removeIdle is true", func(t *testing.T) {
podKey := client.ObjectKey{Name: "idle-pod", Namespace: testNs}
serviceKey := client.ObjectKey{Name: "idle-svc", Namespace: testNs}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "idle-pod", Namespace: testNs},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "idle-svc", Namespace: testNs},
}
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(),
namespace: testNs,
},
}
podInfo := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0)
podInfo.lastActivity = time.Now().Add(-15 * time.Minute) // past TTL
fn := &functionInfo{pods: []functionPodInfo{podInfo}}
pcm.removeUnhealthyPods(fn, true)
assert.Empty(t, fn.pods, "idle pod past TTL should be removed when removeIdle=true")
})
t.Run("idle pod past TTL kept when removeIdle is false", func(t *testing.T) {
podKey := client.ObjectKey{Name: "idle-pod2", Namespace: testNs}
serviceKey := client.ObjectKey{Name: "idle-svc2", Namespace: testNs}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "idle-pod2", Namespace: testNs},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "idle-svc2", Namespace: testNs},
}
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(),
namespace: testNs,
},
}
podInfo := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0)
podInfo.lastActivity = time.Now().Add(-15 * time.Minute) // past TTL
fn := &functionInfo{pods: []functionPodInfo{podInfo}}
pcm.removeUnhealthyPods(fn, false)
assert.Len(t, fn.pods, 1, "idle pod past TTL should be kept when removeIdle=false")
})
}
func TestGarbageCollectorUnit(t *testing.T) {
const testNs = "test-ns"
podKey := client.ObjectKey{Name: "gc-pod", Namespace: testNs}
serviceKey := client.ObjectKey{Name: "gc-svc", Namespace: testNs}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "gc-pod", Namespace: testNs},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "gc-svc", Namespace: testNs},
}
t.Run("removes empty function entries from map", func(t *testing.T) {
pcm := &podCacheManager{
podTTL: 1 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(),
namespace: testNs,
},
functions: map[string]*functionInfo{
"expired-image": {
pods: []functionPodInfo{
func() functionPodInfo {
p := makeReadyPodInfo("expired-image", podKey, serviceKey, conn, 0)
p.lastActivity = time.Now().Add(-5 * time.Minute)
return p
}(),
},
},
},
}
pcm.garbageCollector()
// Allow background deletion goroutines to execute
time.Sleep(100 * time.Millisecond)
assert.Empty(t, pcm.functions, "expired function entry should be removed from map")
})
}
func TestRedistributeLoad(t *testing.T) {
const testNs = "test-ns"
t.Run("redistributes connections to available pods", func(t *testing.T) {
podKey := client.ObjectKey{Name: "redist-pod", Namespace: testNs}
serviceKey := client.ObjectKey{Name: "redist-svc", Namespace: testNs}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "redist-pod", Namespace: testNs},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "redist-svc", Namespace: testNs},
}
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build(),
namespace: testNs,
},
functions: map[string]*functionInfo{},
}
readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0)
fn := &functionInfo{pods: []functionPodInfo{readyPod}}
pcm.functions["test-image"] = fn
// Create connection channels to redistribute
ch1 := make(chan *connectionResponse, 1)
ch2 := make(chan *connectionResponse, 1)
result := pcm.redistributeLoad("test-image", fn, []chan<- *connectionResponse{ch1, ch2})
assert.True(t, result, "should redistribute successfully")
// Both channels should receive responses
resp1 := <-ch1
assert.NoError(t, resp1.err)
resp2 := <-ch2
assert.NoError(t, resp2.err)
})
t.Run("returns false when no pods available", func(t *testing.T) {
pcm := &podCacheManager{
podTTL: 10 * time.Minute,
configMap: map[string]podCacheConfigEntry{},
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
namespace: testNs,
},
functions: map[string]*functionInfo{},
}
fn := &functionInfo{pods: []functionPodInfo{}}
pcm.functions["empty-image"] = fn
ch := make(chan *connectionResponse, 1)
result := pcm.redistributeLoad("empty-image", fn, []chan<- *connectionResponse{ch})
assert.False(t, result, "should return false with no pods")
})
}
func TestDeletePodWithServiceInBackgroundByObjectKey(t *testing.T) {
t.Run("deletes both pod and service", func(t *testing.T) {
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pd-pod", Namespace: "test-ns"},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "pd-svc", Namespace: "test-ns"},
}
kubeClient := fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build()
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: kubeClient,
},
}
podKey := client.ObjectKeyFromObject(k8sPod)
serviceKey := client.ObjectKeyFromObject(k8sSvc)
pd := podData{
podKey: &podKey,
serviceKey: &serviceKey,
}
pcm.DeletePodWithServiceInBackgroundByObjectKey(pd)
time.Sleep(200 * time.Millisecond)
var pod corev1.Pod
err := kubeClient.Get(t.Context(), podKey, &pod)
assert.Error(t, err, "pod should be deleted")
var svc corev1.Service
err = kubeClient.Get(t.Context(), serviceKey, &svc)
assert.Error(t, err, "service should be deleted")
})
t.Run("handles nil keys gracefully", func(t *testing.T) {
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
},
}
pd := podData{
podKey: nil,
serviceKey: nil,
}
// Should not panic
pcm.DeletePodWithServiceInBackgroundByObjectKey(pd)
time.Sleep(50 * time.Millisecond)
})
}
func TestDeletePodAndWait(t *testing.T) {
t.Run("deletes pod and waits for removal", func(t *testing.T) {
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "wait-pod", Namespace: "test-ns"},
}
kubeClient := fake.NewClientBuilder().WithObjects(k8sPod).Build()
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: kubeClient,
namespace: "test-ns",
podReadyTimeout: 5 * time.Second,
},
}
err := pcm.deletePodAndWait(k8sPod)
assert.NoError(t, err)
var pod corev1.Pod
getErr := kubeClient.Get(t.Context(), client.ObjectKeyFromObject(k8sPod), &pod)
assert.Error(t, getErr, "pod should be deleted")
})
}
func TestEnsureCustomAuthSecret(t *testing.T) {
dockerConfig := `{"auths":{"registry.example.com":{"auth":"dGVzdDp0ZXN0"}}}`
tmpFile, err := os.CreateTemp("", "dockerconfig-*.json")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString(dockerConfig)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
kubeClient := fake.NewClientBuilder().Build()
pm := &podManager{
kubeClient: kubeClient,
namespace: "test-ns",
}
err = pm.ensureCustomAuthSecret(t.Context(), tmpFile.Name(), "test-secret")
assert.NoError(t, err)
// Verify secret was created
var secret corev1.Secret
err = kubeClient.Get(t.Context(), client.ObjectKey{Name: "test-secret", Namespace: "test-ns"}, &secret)
assert.NoError(t, err)
}
func TestDeletePodInBackground(t *testing.T) {
t.Run("nil pod does not panic", func(t *testing.T) {
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
},
}
// Should not panic
pcm.DeletePodInBackground(nil)
time.Sleep(50 * time.Millisecond)
})
t.Run("pod with empty name is skipped", func(t *testing.T) {
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
},
}
pod := &corev1.Pod{}
pcm.DeletePodInBackground(pod)
time.Sleep(50 * time.Millisecond)
})
t.Run("normal pod is deleted", func(t *testing.T) {
k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "to-delete", Namespace: "test-ns"},
}
kubeClient := fake.NewClientBuilder().WithObjects(k8sPod).Build()
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: kubeClient,
},
}
pcm.DeletePodInBackground(k8sPod)
time.Sleep(100 * time.Millisecond)
var pod corev1.Pod
err := kubeClient.Get(t.Context(), client.ObjectKeyFromObject(k8sPod), &pod)
assert.Error(t, err, "pod should be deleted")
})
}
func TestDeleteServiceInBackground(t *testing.T) {
t.Run("nil service does not panic", func(t *testing.T) {
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
},
}
pcm.DeleteServiceInBackground(nil)
time.Sleep(50 * time.Millisecond)
})
t.Run("service with empty name is skipped", func(t *testing.T) {
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: fake.NewClientBuilder().Build(),
},
}
svc := &corev1.Service{}
pcm.DeleteServiceInBackground(svc)
time.Sleep(50 * time.Millisecond)
})
t.Run("normal service is deleted", func(t *testing.T) {
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "to-delete-svc", Namespace: "test-ns"},
}
kubeClient := fake.NewClientBuilder().WithObjects(k8sSvc).Build()
pcm := &podCacheManager{
podManager: &podManager{
kubeClient: kubeClient,
},
}
pcm.DeleteServiceInBackground(k8sSvc)
time.Sleep(100 * time.Millisecond)
var svc corev1.Service
err := kubeClient.Get(t.Context(), client.ObjectKeyFromObject(k8sSvc), &svc)
assert.Error(t, err, "service should be deleted")
})
}
// Copyright 2025 The Nephio Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"os"
"path/filepath"
"testing"
"time"
"github.com/google/go-containerregistry/pkg/name"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func TestPodID(t *testing.T) {
tests := []struct {
name string
image string
hash string
postFix string
expected string
expectError bool
}{
{
name: "standard image with tag",
image: "gcr.io/kpt-fn/apply-replacements:latest",
hash: "5245a52778d684fa698f69861fb2e058b308f6a74fed5bf2fe77d97bad5e071c",
postFix: "1",
expected: "apply-replacements-latest-1-5245a527",
},
{
name: "image without explicit tag defaults to latest",
image: "gcr.io/kpt-fn/set-namespace",
hash: "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
postFix: "2",
expected: "set-namespace-latest-2-abcdef12",
},
{
name: "image with version tag containing dots",
image: "gcr.io/kpt-fn/set-labels:v0.4.1",
hash: "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
postFix: "1",
expected: "set-labels-v041-1-12345678",
},
{
name: "image with underscores in name",
image: "gcr.io/kpt-fn/my_function:latest",
hash: "aabbccddee112233aabbccddee112233aabbccddee112233aabbccddee112233",
postFix: "1",
expected: "my-function-latest-1-aabbccdd",
},
{
name: "invalid image reference",
image: "invalid@oci@ref",
hash: "deadbeef",
postFix: "1",
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := podID(tt.image, tt.hash, tt.postFix)
if tt.expectError {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expected, result)
}
})
}
}
func TestIsPodTemplateSameVersion(t *testing.T) {
tests := []struct {
name string
pod *corev1.Pod
templateVersion string
expected bool
}{
{
name: "matching version",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
templateVersionAnnotation: "v1",
},
},
},
templateVersion: "v1",
expected: true,
},
{
name: "mismatching version",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
templateVersionAnnotation: "v1",
},
},
},
templateVersion: "v2",
expected: false,
},
{
name: "no annotation",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
},
templateVersion: "v1",
expected: false,
},
{
name: "nil annotations",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{},
},
templateVersion: "v1",
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isPodTemplateSameVersion(tt.pod, tt.templateVersion)
assert.Equal(t, tt.expected, result)
})
}
}
func TestHasImagePullSecret(t *testing.T) {
tests := []struct {
name string
pod *corev1.Pod
secretName string
expected bool
}{
{
name: "secret found",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
ImagePullSecrets: []corev1.LocalObjectReference{
{Name: "my-secret"},
{Name: "other-secret"},
},
},
},
secretName: "my-secret",
expected: true,
},
{
name: "secret not found",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
ImagePullSecrets: []corev1.LocalObjectReference{
{Name: "other-secret"},
},
},
},
secretName: "my-secret",
expected: false,
},
{
name: "empty secrets list",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
ImagePullSecrets: []corev1.LocalObjectReference{},
},
},
secretName: "my-secret",
expected: false,
},
{
name: "nil secrets list",
pod: &corev1.Pod{
Spec: corev1.PodSpec{},
},
secretName: "my-secret",
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := hasImagePullSecret(tt.pod, tt.secretName)
assert.Equal(t, tt.expected, result)
})
}
}
func TestPatchNewPodContainer(t *testing.T) {
pm := &podManager{maxGrpcMessageSize: 4 * 1024 * 1024}
t.Run("patches function container successfully", func(t *testing.T) {
pod := &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: functionContainerName,
Image: "to-be-replaced",
Command: []string{filepath.Join(volumeMountPath, wrapperServerBin)},
},
},
},
}
de := digestAndEntrypoint{
digest: "abc123",
entrypoint: []string{"/my-function"},
}
err := pm.patchNewPodContainer(pod, de, "gcr.io/kpt-fn/my-function:latest")
require.NoError(t, err)
container := pod.Spec.Containers[0]
assert.Equal(t, "gcr.io/kpt-fn/my-function:latest", container.Image)
assert.Contains(t, container.Args, "--port")
assert.Contains(t, container.Args, defaultWrapperServerPort)
assert.Contains(t, container.Args, "--")
assert.Contains(t, container.Args, "/my-function")
})
t.Run("returns error when function container not found", func(t *testing.T) {
pod := &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "not-a-function",
Image: "some-image",
},
},
},
}
de := digestAndEntrypoint{
digest: "abc123",
entrypoint: []string{"/entry"},
}
err := pm.patchNewPodContainer(pod, de, "test-image")
assert.Error(t, err)
assert.Contains(t, err.Error(), functionContainerName)
})
t.Run("handles multiple containers", func(t *testing.T) {
pod := &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "sidecar", Image: "sidecar-image"},
{
Name: functionContainerName,
Image: "to-be-replaced",
Command: []string{filepath.Join(volumeMountPath, wrapperServerBin)},
},
},
},
}
de := digestAndEntrypoint{
digest: "abc123",
entrypoint: []string{"/fn"},
}
err := pm.patchNewPodContainer(pod, de, "my-image")
require.NoError(t, err)
assert.Equal(t, "sidecar-image", pod.Spec.Containers[0].Image, "sidecar should be unchanged")
assert.Equal(t, "my-image", pod.Spec.Containers[1].Image, "function container should be patched")
})
}
func TestPatchNewPodMetadata(t *testing.T) {
pm := &podManager{namespace: "test-ns"}
t.Run("sets namespace labels and annotations from scratch", func(t *testing.T) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{},
}
pm.patchNewPodMetadata(pod, "my-pod-id", "v2")
assert.Equal(t, "test-ns", pod.Namespace)
assert.Equal(t, "v2", pod.Annotations[templateVersionAnnotation])
assert.Equal(t, "my-pod-id", pod.Labels[krmFunctionImageLabel])
})
t.Run("preserves existing annotations and labels", func(t *testing.T) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"existing-key": "existing-val"},
Labels: map[string]string{"existing-label": "label-val"},
},
}
pm.patchNewPodMetadata(pod, "pod-id", "v1")
assert.Equal(t, "existing-val", pod.Annotations["existing-key"], "existing annotations preserved")
assert.Equal(t, "v1", pod.Annotations[templateVersionAnnotation], "template version set")
assert.Equal(t, "label-val", pod.Labels["existing-label"], "existing labels preserved")
assert.Equal(t, "pod-id", pod.Labels[krmFunctionImageLabel], "function image label set")
})
}
func TestAppendImagePullSecret(t *testing.T) {
t.Run("appends secret for private registry image", func(t *testing.T) {
pm := &podManager{
enablePrivateRegistries: true,
registryAuthSecretName: "my-auth-secret",
}
pod := &corev1.Pod{
Spec: corev1.PodSpec{},
}
pm.appendImagePullSecret("private.registry.io/my-fn:latest", pod)
require.Len(t, pod.Spec.ImagePullSecrets, 1)
assert.Equal(t, "my-auth-secret", pod.Spec.ImagePullSecrets[0].Name)
})
t.Run("does not append if secret already exists", func(t *testing.T) {
pm := &podManager{
enablePrivateRegistries: true,
registryAuthSecretName: "my-auth-secret",
}
pod := &corev1.Pod{
Spec: corev1.PodSpec{
ImagePullSecrets: []corev1.LocalObjectReference{
{Name: "my-auth-secret"},
},
},
}
pm.appendImagePullSecret("private.registry.io/my-fn:latest", pod)
assert.Len(t, pod.Spec.ImagePullSecrets, 1, "should not duplicate")
})
t.Run("does not append for default registry", func(t *testing.T) {
pm := &podManager{
enablePrivateRegistries: true,
registryAuthSecretName: "my-auth-secret",
}
pod := &corev1.Pod{
Spec: corev1.PodSpec{},
}
pm.appendImagePullSecret(defaultRegistry+"my-fn:latest", pod)
assert.Empty(t, pod.Spec.ImagePullSecrets)
})
t.Run("does not append when private registries disabled", func(t *testing.T) {
pm := &podManager{
enablePrivateRegistries: false,
registryAuthSecretName: "my-auth-secret",
}
pod := &corev1.Pod{
Spec: corev1.PodSpec{},
}
pm.appendImagePullSecret("private.registry.io/my-fn:latest", pod)
assert.Empty(t, pod.Spec.ImagePullSecrets)
})
}
func TestLoadTLSConfig(t *testing.T) {
t.Run("valid PEM certificate", func(t *testing.T) {
// Generate a self-signed certificate for testing
certPEM := generateSelfSignedCertPEM(t)
tmpFile, err := os.CreateTemp("", "ca-cert-*.pem")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.Write(certPEM)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
tlsConfig, err := loadTLSConfig(tmpFile.Name())
require.NoError(t, err)
assert.NotNil(t, tlsConfig)
assert.NotNil(t, tlsConfig.RootCAs)
})
t.Run("invalid PEM data", func(t *testing.T) {
tmpFile, err := os.CreateTemp("", "ca-cert-invalid-*.pem")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString("not a valid PEM certificate")
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
_, err = loadTLSConfig(tmpFile.Name())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to append certificates")
})
t.Run("missing file", func(t *testing.T) {
_, err := loadTLSConfig("/nonexistent/ca.pem")
assert.Error(t, err)
})
}
func TestCreateTransport(t *testing.T) {
certPEM := generateSelfSignedCertPEM(t)
tmpFile, err := os.CreateTemp("", "transport-cert-*.pem")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.Write(certPEM)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
tlsConfig, err := loadTLSConfig(tmpFile.Name())
require.NoError(t, err)
transport := createTransport(tlsConfig)
assert.NotNil(t, transport)
assert.Equal(t, tlsConfig, transport.TLSClientConfig)
}
func TestFindPodsForService(t *testing.T) {
t.Run("returns matching pods", func(t *testing.T) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "my-svc", Namespace: "test-ns"},
Spec: corev1.ServiceSpec{
Selector: map[string]string{"app": "my-fn"},
},
}
matchingPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "matching-pod",
Namespace: "test-ns",
Labels: map[string]string{"app": "my-fn"},
},
}
nonMatchingPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "other-pod",
Namespace: "test-ns",
Labels: map[string]string{"app": "other"},
},
}
kubeClient := fake.NewClientBuilder().WithObjects(matchingPod, nonMatchingPod).Build()
pm := &podManager{kubeClient: kubeClient}
pods, err := pm.findPodsForService(t.Context(), svc)
require.NoError(t, err)
assert.Len(t, pods, 1)
assert.Equal(t, "matching-pod", pods[0].Name)
})
t.Run("returns empty when no pods match", func(t *testing.T) {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "my-svc", Namespace: "test-ns"},
Spec: corev1.ServiceSpec{
Selector: map[string]string{"app": "no-match"},
},
}
kubeClient := fake.NewClientBuilder().Build()
pm := &podManager{kubeClient: kubeClient}
pods, err := pm.findPodsForService(t.Context(), svc)
require.NoError(t, err)
assert.Empty(t, pods)
})
}
func TestInspectOrCreateSecret(t *testing.T) {
t.Run("creates secret when not exists", func(t *testing.T) {
// Write a temporary docker config file
tmpFile, err := os.CreateTemp("", "dockerconfig-*.json")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
dockerConfig := `{"auths":{"registry.example.com":{"auth":"dGVzdDp0ZXN0"}}}`
_, err = tmpFile.WriteString(dockerConfig)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
kubeClient := fake.NewClientBuilder().Build()
pm := &podManager{
kubeClient: kubeClient,
namespace: "test-ns",
}
err = pm.InspectOrCreateSecret(t.Context(), tmpFile.Name(), "test-secret")
require.NoError(t, err)
// Verify secret was created
var secret corev1.Secret
err = kubeClient.Get(t.Context(), client.ObjectKey{Name: "test-secret", Namespace: "test-ns"}, &secret)
require.NoError(t, err)
assert.Equal(t, corev1.SecretTypeDockerConfigJson, secret.Type)
assert.Equal(t, dockerConfig, string(secret.Data[".dockerconfigjson"]))
})
t.Run("does not recreate when secret matches", func(t *testing.T) {
dockerConfig := `{"auths":{"registry.example.com":{"auth":"dGVzdDp0ZXN0"}}}`
tmpFile, err := os.CreateTemp("", "dockerconfig-*.json")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString(dockerConfig)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
existingSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "test-secret", Namespace: "test-ns"},
Data: map[string][]byte{".dockerconfigjson": []byte(dockerConfig)},
Type: corev1.SecretTypeDockerConfigJson,
}
kubeClient := fake.NewClientBuilder().WithObjects(existingSecret).Build()
pm := &podManager{
kubeClient: kubeClient,
namespace: "test-ns",
}
err = pm.InspectOrCreateSecret(t.Context(), tmpFile.Name(), "test-secret")
require.NoError(t, err)
})
t.Run("updates secret when content differs", func(t *testing.T) {
newDockerConfig := `{"auths":{"new-registry.example.com":{"auth":"bmV3OnRlc3Q="}}}`
tmpFile, err := os.CreateTemp("", "dockerconfig-*.json")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString(newDockerConfig)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
existingSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "test-secret", Namespace: "test-ns"},
Data: map[string][]byte{".dockerconfigjson": []byte(`{"auths":{"old-registry.example.com":{}}}`)},
Type: corev1.SecretTypeDockerConfigJson,
}
kubeClient := fake.NewClientBuilder().WithObjects(existingSecret).Build()
pm := &podManager{
kubeClient: kubeClient,
namespace: "test-ns",
}
err = pm.InspectOrCreateSecret(t.Context(), tmpFile.Name(), "test-secret")
require.NoError(t, err)
// Verify secret was updated
var secret corev1.Secret
err = kubeClient.Get(t.Context(), client.ObjectKey{Name: "test-secret", Namespace: "test-ns"}, &secret)
require.NoError(t, err)
assert.Equal(t, newDockerConfig, string(secret.Data[".dockerconfigjson"]))
})
t.Run("returns error for missing auth file", func(t *testing.T) {
kubeClient := fake.NewClientBuilder().Build()
pm := &podManager{
kubeClient: kubeClient,
namespace: "test-ns",
}
err := pm.InspectOrCreateSecret(t.Context(), "/nonexistent/path", "test-secret")
assert.Error(t, err)
})
}
func TestGetCustomAuth(t *testing.T) {
t.Run("parses docker config and returns authenticator", func(t *testing.T) {
dockerConfig := `{"auths":{"gcr.io":{"username":"_json_key","password":"secret","auth":"X2pzb25fa2V5OnNlY3JldA=="}}}`
tmpFile, err := os.CreateTemp("", "dockerconfig-*.json")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString(dockerConfig)
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
pm := &podManager{}
// Use a reference that resolves to gcr.io
ref, err := parseTestReference("gcr.io/my-project/my-image:latest")
require.NoError(t, err)
auth, err := pm.getCustomAuth(ref, tmpFile.Name())
require.NoError(t, err)
assert.NotNil(t, auth)
})
t.Run("returns error for missing file", func(t *testing.T) {
pm := &podManager{}
ref, err := parseTestReference("gcr.io/my-project/my-image:latest")
require.NoError(t, err)
_, err = pm.getCustomAuth(ref, "/nonexistent/path")
assert.Error(t, err)
})
t.Run("returns error for invalid JSON", func(t *testing.T) {
tmpFile, err := os.CreateTemp("", "dockerconfig-*.json")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
_, err = tmpFile.WriteString("{invalid json")
require.NoError(t, err)
require.NoError(t, tmpFile.Close())
pm := &podManager{}
ref, err := parseTestReference("gcr.io/my-project/my-image:latest")
require.NoError(t, err)
_, err = pm.getCustomAuth(ref, tmpFile.Name())
assert.Error(t, err)
})
}
// generateSelfSignedCertPEM generates a self-signed certificate PEM for testing.
func generateSelfSignedCertPEM(t *testing.T) []byte {
t.Helper()
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
require.NoError(t, err)
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{Organization: []string{"Test"}},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour),
KeyUsage: x509.KeyUsageCertSign,
IsCA: true,
IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
require.NoError(t, err)
return pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
}
// parseTestReference is a helper to parse an image reference for testing.
func parseTestReference(image string) (name.Reference, error) {
return name.ParseReference(image)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment