Created
June 10, 2025 08:51
-
-
Save ramonsmits/a10e28595a995e9808aeaecc814b709c to your computer and use it in GitHub Desktop.
NServiceBus - Watchdog that will execute a FailFast is no message processing happened for 5 minutes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed class WatchdogBehavior : Behavior<IIncomingLogicalMessageContext>, IDisposable | |
{ | |
static readonly TimeSpan watchdogTimeout = TimeSpan.FromMinutes(5); | |
static volatile long lastActivityTicks = DateTime.UtcNow.Ticks; | |
static Thread? watchdogThread; | |
static CancellationTokenSource? cancellationTokenSource; | |
public WatchdogBehavior() | |
{ | |
StartWatchdog(); | |
} | |
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next) | |
{ | |
lastActivityTicks = DateTime.UtcNow.Ticks; | |
return next(); | |
} | |
static void StartWatchdog() | |
{ | |
if (watchdogThread?.IsAlive == true) return; | |
cancellationTokenSource = new CancellationTokenSource(); | |
watchdogThread = new Thread(() => | |
{ | |
var nextCheck = TimeSpan.FromSeconds(30); | |
while (!cancellationTokenSource.Token.IsCancellationRequested) | |
{ | |
var lastActivity = new DateTime(lastActivityTicks); | |
var timeSinceActivity = DateTime.UtcNow - lastActivity; | |
if (timeSinceActivity > watchdogTimeout) | |
{ | |
Environment.FailFast($"Message processing stalled. Last activity: {lastActivity:yyyy-MM-dd HH:mm:ss}"); | |
} | |
nextCheck = timeSinceActivity < watchdogTimeout / 2 | |
? TimeSpan.FromMinutes(1) // Check less frequently when healthy | |
: TimeSpan.FromSeconds(15); // Check more frequently when approaching timeout | |
Thread.Sleep(nextCheck); | |
} | |
}) | |
{ IsBackground = false, Name = "NSB-Watchdog" }; | |
watchdogThread.Start(); | |
} | |
public void Dispose() | |
{ | |
cancellationTokenSource?.Cancel(); | |
cancellationTokenSource?.Dispose(); | |
} | |
} | |
sealed class WatchdogFeature : Feature | |
{ | |
protected override void Setup(FeatureConfigurationContext context) | |
{ | |
context.RegisterStartupTask(new WatchdogStartupTask()); | |
} | |
} | |
sealed class WatchdogStartupTask : FeatureStartupTask, IDisposable | |
{ | |
WatchdogBehavior? watchdog; | |
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken) | |
{ | |
watchdog = new WatchdogBehavior(); | |
return Task.CompletedTask; | |
} | |
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken) | |
{ | |
watchdog?.Dispose(); | |
return Task.CompletedTask; | |
} | |
public void Dispose() => watchdog?.Dispose(); // Is here just incase, but NServiceBus does not invoke dispose. | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment