Skip to content

Instantly share code, notes, and snippets.

@recalde
Created November 14, 2024 03:47
Show Gist options
  • Save recalde/8d74125ccff318a5246e30d6d55a51c4 to your computer and use it in GitHub Desktop.
Save recalde/8d74125ccff318a5246e30d6d55a51c4 to your computer and use it in GitHub Desktop.
Kubernetes Pod Log Watcher

Kubernetes Pod Log File Writer and Watcher Services

These services provide real-time log collection and efficient file writing for logs from selected Kubernetes pods.

Components

KubernetesPodLogWatcherService

  1. Pod Monitoring: Watches for selected pods using a label selector based on the POD_SELECTOR environment variable.
  2. Log Streaming: Streams logs from each pod and publishes them to subscribed
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class KubernetesPodLogFileWriterService : IHostedService, IDisposable
{
private readonly ILogger<KubernetesPodLogFileWriterService> _logger;
private readonly KubernetesPodLogWatcherService _logWatcher;
private readonly ConcurrentDictionary<string, DateTime> _lastLogTimestampCache;
private readonly ConcurrentDictionary<string, StringBuilder> _logBuffers;
private readonly string _checkpointFilePath = "pod_log_checkpoints.txt";
private readonly string _logDirectory = "pod_logs";
private Timer _flushTimer;
private Guid _subscriptionId;
public KubernetesPodLogFileWriterService(ILogger<KubernetesPodLogFileWriterService> logger, KubernetesPodLogWatcherService logWatcher)
{
_logger = logger;
_logWatcher = logWatcher;
_lastLogTimestampCache = new ConcurrentDictionary<string, DateTime>();
_logBuffers = new ConcurrentDictionary<string, StringBuilder>();
Directory.CreateDirectory(_logDirectory);
LoadCheckpoints();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_subscriptionId = _logWatcher.Subscribe(ProcessLog);
_flushTimer = new Timer(FlushBuffers, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logWatcher.Unsubscribe(_subscriptionId);
FlushBuffers(null);
return Task.CompletedTask;
}
private void ProcessLog(string podName, string logLine)
{
if (TryParseLogTimestamp(logLine, out var logTimestamp) &&
logTimestamp > _lastLogTimestampCache.GetOrAdd(podName, DateTime.MinValue))
{
_lastLogTimestampCache[podName] = logTimestamp;
var buffer = _logBuffers.GetOrAdd(podName, new StringBuilder());
buffer.AppendLine(logLine);
if (buffer.Length >= 1000)
{
WriteLogBuffer(podName);
}
}
}
private void WriteLogBuffer(string podName)
{
var logFilePath = Path.Combine(_logDirectory, $"{podName}.log");
var buffer = _logBuffers.GetOrAdd(podName, new StringBuilder());
File.AppendAllText(logFilePath, buffer.ToString());
buffer.Clear();
SaveCheckpoint(podName);
}
private void FlushBuffers(object state)
{
foreach (var podName in _logBuffers.Keys)
{
WriteLogBuffer(podName);
}
}
private void LoadCheckpoints()
{
if (File.Exists(_checkpointFilePath))
{
foreach (var line in File.ReadAllLines(_checkpointFilePath))
{
var parts = line.Split(',');
if (parts.Length == 2 && DateTime.TryParse(parts[1], out var timestamp))
{
_lastLogTimestampCache[parts[0]] = timestamp;
}
}
}
}
private void SaveCheckpoint(string podName)
{
var checkpointData = new StringBuilder();
foreach (var kvp in _lastLogTimestampCache)
{
checkpointData.AppendLine($"{kvp.Key},{kvp.Value:O}");
}
File.WriteAllText(_checkpointFilePath, checkpointData.ToString());
}
private bool TryParseLogTimestamp(string logLine, out DateTime timestamp)
{
timestamp = DateTime.MinValue;
if (logLine.Length > 20 && DateTime.TryParse(logLine.Substring(0, 20), out var parsedTimestamp))
{
timestamp = parsedTimestamp;
return true;
}
return false;
}
public void Dispose()
{
_flushTimer?.Dispose();
}
}
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using k8s;
using k8s.Models;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class KubernetesPodLogWatcherService : BackgroundService
{
private readonly ILogger<KubernetesPodLogWatcherService> _logger;
private readonly IKubernetes _kubernetesClient;
private readonly ConcurrentDictionary<string, Task> _logStreamTasks;
private readonly ConcurrentDictionary<Guid, Action<string, string>> _subscribers;
private readonly string _podSelector;
public KubernetesPodLogWatcherService(
ILogger<KubernetesPodLogWatcherService> logger,
IKubernetes kubernetesClient,
string podSelector)
{
_logger = logger;
_kubernetesClient = kubernetesClient;
_podSelector = podSelector;
_logStreamTasks = new ConcurrentDictionary<string, Task>();
_subscribers = new ConcurrentDictionary<Guid, Action<string, string>>();
}
public Guid Subscribe(Action<string, string> logHandler)
{
var id = Guid.NewGuid();
_subscribers[id] = logHandler;
return id;
}
public bool Unsubscribe(Guid subscriberId)
{
return _subscribers.TryRemove(subscriberId, out _);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.Register(() => _logger.LogInformation("Kubernetes Pod Log Watcher is stopping."));
_logger.LogInformation("Kubernetes Pod Log Watcher started.");
while (!stoppingToken.IsCancellationRequested)
{
using (var podWatcher = _kubernetesClient.ListNamespacedPodWithHttpMessagesAsync(
"default",
watch: true,
labelSelector: _podSelector).ConfigureAwait(false).GetAwaiter().GetResult())
{
podWatcher.Watch<V1Pod, V1PodList>(
onEvent: (eventType, pod) =>
{
var podName = pod.Metadata.Name;
if (eventType == WatchEventType.Added || eventType == WatchEventType.Modified)
{
if (!_logStreamTasks.ContainsKey(podName))
{
_logStreamTasks[podName] = StartLogStreamAsync(podName, stoppingToken);
}
}
else if (eventType == WatchEventType.Deleted)
{
_logStreamTasks.TryRemove(podName, out _);
}
},
onError: e => _logger.LogError(e, "Error watching pods"),
onClosed: () => _logger.LogInformation("Pod watcher closed, reconnecting...")
);
}
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
}
}
private async Task StartLogStreamAsync(string podName, CancellationToken cancellationToken)
{
try
{
using var logStream = await _kubernetesClient.ReadNamespacedPodLogAsync(
name: podName,
@namespace: "default",
follow: true,
timestamps: true,
cancellationToken: cancellationToken);
var reader = new StreamReader(logStream);
while (!cancellationToken.IsCancellationRequested && !reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
if (line != null)
{
foreach (var subscriber in _subscribers.Values)
{
subscriber.Invoke(podName, line);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error streaming logs for pod {podName}");
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using k8s;
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
var podSelector = Environment.GetEnvironmentVariable("POD_SELECTOR") ?? string.Empty;
services.AddSingleton<IKubernetes>(sp =>
{
return new Kubernetes(KubernetesClientConfiguration.InClusterConfig());
});
services.AddSingleton<KubernetesPodLogWatcherService>();
services.AddHostedService(sp => sp.GetRequiredService<KubernetesPodLogWatcherService>());
services.AddSingleton<KubernetesPodLogFileWriterService>();
services.AddHostedService(sp => sp.GetRequiredService<KubernetesPodLogFileWriterService>());
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment