|
using System; |
|
using System.Collections.Concurrent; |
|
using System.IO; |
|
using System.Text; |
|
using System.Threading; |
|
using System.Threading.Tasks; |
|
using Microsoft.Extensions.Hosting; |
|
using Microsoft.Extensions.Logging; |
|
|
|
public class KubernetesPodLogFileWriterService : IHostedService, IDisposable |
|
{ |
|
private readonly ILogger<KubernetesPodLogFileWriterService> _logger; |
|
private readonly KubernetesPodLogWatcherService _logWatcher; |
|
private readonly ConcurrentDictionary<string, DateTime> _lastLogTimestampCache; |
|
private readonly ConcurrentDictionary<string, StringBuilder> _logBuffers; |
|
private readonly string _checkpointFilePath = "pod_log_checkpoints.txt"; |
|
private readonly string _logDirectory = "pod_logs"; |
|
private Timer _flushTimer; |
|
private Guid _subscriptionId; |
|
|
|
public KubernetesPodLogFileWriterService(ILogger<KubernetesPodLogFileWriterService> logger, KubernetesPodLogWatcherService logWatcher) |
|
{ |
|
_logger = logger; |
|
_logWatcher = logWatcher; |
|
_lastLogTimestampCache = new ConcurrentDictionary<string, DateTime>(); |
|
_logBuffers = new ConcurrentDictionary<string, StringBuilder>(); |
|
|
|
Directory.CreateDirectory(_logDirectory); |
|
LoadCheckpoints(); |
|
} |
|
|
|
public Task StartAsync(CancellationToken cancellationToken) |
|
{ |
|
_subscriptionId = _logWatcher.Subscribe(ProcessLog); |
|
_flushTimer = new Timer(FlushBuffers, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5)); |
|
return Task.CompletedTask; |
|
} |
|
|
|
public Task StopAsync(CancellationToken cancellationToken) |
|
{ |
|
_logWatcher.Unsubscribe(_subscriptionId); |
|
FlushBuffers(null); |
|
return Task.CompletedTask; |
|
} |
|
|
|
private void ProcessLog(string podName, string logLine) |
|
{ |
|
if (TryParseLogTimestamp(logLine, out var logTimestamp) && |
|
logTimestamp > _lastLogTimestampCache.GetOrAdd(podName, DateTime.MinValue)) |
|
{ |
|
_lastLogTimestampCache[podName] = logTimestamp; |
|
|
|
var buffer = _logBuffers.GetOrAdd(podName, new StringBuilder()); |
|
buffer.AppendLine(logLine); |
|
|
|
if (buffer.Length >= 1000) |
|
{ |
|
WriteLogBuffer(podName); |
|
} |
|
} |
|
} |
|
|
|
private void WriteLogBuffer(string podName) |
|
{ |
|
var logFilePath = Path.Combine(_logDirectory, $"{podName}.log"); |
|
var buffer = _logBuffers.GetOrAdd(podName, new StringBuilder()); |
|
|
|
File.AppendAllText(logFilePath, buffer.ToString()); |
|
buffer.Clear(); |
|
|
|
SaveCheckpoint(podName); |
|
} |
|
|
|
private void FlushBuffers(object state) |
|
{ |
|
foreach (var podName in _logBuffers.Keys) |
|
{ |
|
WriteLogBuffer(podName); |
|
} |
|
} |
|
|
|
private void LoadCheckpoints() |
|
{ |
|
if (File.Exists(_checkpointFilePath)) |
|
{ |
|
foreach (var line in File.ReadAllLines(_checkpointFilePath)) |
|
{ |
|
var parts = line.Split(','); |
|
if (parts.Length == 2 && DateTime.TryParse(parts[1], out var timestamp)) |
|
{ |
|
_lastLogTimestampCache[parts[0]] = timestamp; |
|
} |
|
} |
|
} |
|
} |
|
|
|
private void SaveCheckpoint(string podName) |
|
{ |
|
var checkpointData = new StringBuilder(); |
|
foreach (var kvp in _lastLogTimestampCache) |
|
{ |
|
checkpointData.AppendLine($"{kvp.Key},{kvp.Value:O}"); |
|
} |
|
|
|
File.WriteAllText(_checkpointFilePath, checkpointData.ToString()); |
|
} |
|
|
|
private bool TryParseLogTimestamp(string logLine, out DateTime timestamp) |
|
{ |
|
timestamp = DateTime.MinValue; |
|
if (logLine.Length > 20 && DateTime.TryParse(logLine.Substring(0, 20), out var parsedTimestamp)) |
|
{ |
|
timestamp = parsedTimestamp; |
|
return true; |
|
} |
|
|
|
return false; |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
_flushTimer?.Dispose(); |
|
} |
|
} |