Skip to content

Instantly share code, notes, and snippets.

@recalde
Created November 14, 2024 03:20
Show Gist options
  • Save recalde/7282cdc2bc4324c3435efed4e344881a to your computer and use it in GitHub Desktop.
Save recalde/7282cdc2bc4324c3435efed4e344881a to your computer and use it in GitHub Desktop.
Pod Watcher
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using k8s;
using k8s.Models;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class KubernetesPodWatcherService : BackgroundService
{
private readonly ILogger<KubernetesPodWatcherService> _logger;
private readonly IKubernetes _kubernetesClient;
private readonly ConcurrentDictionary<string, V1Pod> _podsCache;
public KubernetesPodWatcherService(ILogger<KubernetesPodWatcherService> logger, IKubernetes kubernetesClient)
{
_logger = logger;
_kubernetesClient = kubernetesClient;
_podsCache = new ConcurrentDictionary<string, V1Pod>();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.Register(() => _logger.LogInformation("Kubernetes Pod Watcher is stopping."));
_logger.LogInformation("Kubernetes Pod Watcher started.");
while (!stoppingToken.IsCancellationRequested)
{
using (var watcher = _kubernetesClient.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(false).GetAwaiter().GetResult())
{
watcher.Watch<V1Pod, V1PodList>(
onEvent: (eventType, pod) =>
{
var podName = pod.Metadata.Name;
switch (eventType)
{
case WatchEventType.Added:
case WatchEventType.Modified:
_podsCache.AddOrUpdate(podName, pod, (key, oldValue) => pod);
break;
case WatchEventType.Deleted:
_podsCache.TryRemove(podName, out _);
break;
}
_logger.LogInformation($"Pod {podName} {eventType}");
},
onError: e => _logger.LogError(e, "Error watching pods"),
onClosed: () => _logger.LogInformation("Pod watcher closed, reconnecting...")
);
}
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); // Short delay to avoid rapid reconnections
}
}
public ConcurrentDictionary<string, V1Pod> GetCurrentPods() => _podsCache;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment