Skip to content

Instantly share code, notes, and snippets.

Created November 22, 2019 05:43
Show Gist options
  • Save tnayanam/cf18c76dd05499204c19ae473769048e to your computer and use it in GitHub Desktop.
Save tnayanam/cf18c76dd05499204c19ae473769048e to your computer and use it in GitHub Desktop.
using SM.BrokerDomain.DTO.Configuration;
using SM.Common.BLL.AppServices.Statistics;
using SM.Common.BLL.Helpers;
using SM.Common.DTO.Configuration;
using SM.Common.DTO.Exceptions;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace SM.BrokerDomain.BLL
/// <summary>
/// Maintains a concurrent queue of the specified type.
/// Provides a method for enqueuing logs, and methods for dequeueing a batch of logs, passing them to a write callback,
/// and handling error/load issues, as well as ignored logs when service is stopped.
/// </summary>
public class ConcurrentLogQueue<T>
const int PerformanceCounterSamples = 30;
const int MaxEnqueuesBetweenThrottlingChecks = 500;
[Obsolete("Instead of allowing multiple write attempts on failure, because logs could pile up in memory, we throttle and drop logs to keep up with the production of logs in real-time.")]
public static readonly int MaxFailedWriteAttempts = Config.TryGet(ConfigurationKeys.LoggingMaxFailedWriteAttempts, 60);
/// <summary>
/// Max number of logs we'll allow to queue in memory before throttlng new logs and dropping batches of logs to play catch-up when log flushes cannot keep up with the volume.
/// </summary>
public static readonly int MaxEnqueudLogs = Config.TryGet(ConfigurationKeys.LoggingMaxEnqueuedLogs, 20000);
private ConcurrentQueue<T> _logs;
private bool _dumpOnNextFailure;
private int logsAdded = 0;
private volatile bool _throttling = false;
private string _logType;
private int _logWriteBatchSize;
private bool _enabled = false;
#region Statistics
private long _failedWriteAttempts; //internal/cyclical, do not reset or return value with statistics
private long _totalFailedWriteAttempts;
private long _totalLogsWritten;
private long _totalLogsDumped;
private long _totalLogsThrottled;
private long _maxLogsDequeuedAtOnce; //max logs present when Flush was called (max observed in queue at time of flush)
private long _maxDatabaseWriteTime;
private long _maxAwsWriteTime;
private OperationPerformanceCounter _perfCounter;
public void ResetStatistics()
Interlocked.Exchange(ref _totalFailedWriteAttempts, 0L);
Interlocked.Exchange(ref _totalLogsWritten, 0L);
Interlocked.Exchange(ref _totalLogsDumped, 0L);
Interlocked.Exchange(ref _totalLogsThrottled, 0L);
Interlocked.Exchange(ref _maxLogsDequeuedAtOnce, 0L);
Interlocked.Exchange(ref _maxAwsWriteTime, 0L);
Interlocked.Exchange(ref _maxDatabaseWriteTime, 0L);
public ConcurrentLogQueue(int logWriteBatchSize)
if (logWriteBatchSize < 100)
logWriteBatchSize = 100; //log batch size should never be too low; too much overhead
_logs = new ConcurrentQueue<T>();
_dumpOnNextFailure = false;
_logType = typeof(T).Name;
_logWriteBatchSize = logWriteBatchSize;
_failedWriteAttempts = 0;
_totalLogsDumped = 0;
_totalLogsThrottled = 0;
_maxLogsDequeuedAtOnce = 0;
_maxDatabaseWriteTime = 0;
_maxAwsWriteTime = 0;
_perfCounter = new OperationPerformanceCounter(PerformanceCounterSamples);
public bool Enabled
get => _enabled;
set {
_enabled = value;
if (!_enabled)
public bool IsEmpty => _logs.IsEmpty;
public int LogWriteBatchSize => _logWriteBatchSize;
public long TotalLogsDumped => Interlocked.Read(ref _totalLogsDumped);
public long TotalLogsThrottled => Interlocked.Read(ref _totalLogsThrottled);
public long TotalFailedWriteAttempts => Interlocked.Read(ref _totalFailedWriteAttempts);
public long TotalLogsWritten => Interlocked.Read(ref _totalLogsWritten);
public TimeSpan MaxDatabaseWriteTime => TimeSpan.FromTicks(Interlocked.Read(ref _maxDatabaseWriteTime));
public TimeSpan MaxAwsWriteTime => TimeSpan.FromTicks(Interlocked.Read(ref _maxAwsWriteTime));
public bool Throttling => _throttling;
public int LogsEnqueued => _logs.Count;
public double LogsPerSecondBurst => _perfCounter.OperationsPerSecondBurst;
public double LogsPerSecondRealTime => _perfCounter.OperationsPerSecondRealTime;
public double MaxLogsDequeuedAtOnce => _maxLogsDequeuedAtOnce;
public void Enqueue(T log)
//When not enabled (i.e. when the logging service is off) we do not retain logs.
if (Enabled)
//Note: When enforcing upper limit, we check _logs.Count only every 1000 enqueues
//because it may not perform well under high load:
if (Interlocked.Increment(ref logsAdded) >= MaxEnqueuesBetweenThrottlingChecks)
//Reset "log Count" sampling counter
Interlocked.Exchange(ref logsAdded, 0);
//Update throttling; always set value here to ensure it cannot get stuck on as long as there are items being added to the queue.
_throttling = _logs.Count > MaxEnqueudLogs;
if (_throttling)
Interlocked.Increment(ref _totalLogsThrottled);
/// <summary>
/// Dequeues and discards all currently queue logs.
/// </summary>
public void Dump()
[Obsolete("Code was updated to drop logs to keep up with their creation, so they are dropped immediately on failure to write.")]
public void DumpOnNextFailure()
_dumpOnNextFailure = true;
public async Task FlushAsync(CancellationToken writeCancellationToken, Func<List<T>,CancellationToken,Task<BulkLogWriter.BulkLogWriterResult>> asyncWriteCallback)
var writeTimer = new Stopwatch();
List<T> dequeuedLogs;
bool errorsLogged = false; //Ensure errors are logged once per flush, not once per batch
//To honor "Flush" semantics, we must commit to writing only a finite number of logs -- what's currently in the queue -- and then return.
int logsToDequeue = _logs.Count;
InterlockedExtensions.ExchangeIfGreaterThan(ref _maxLogsDequeuedAtOnce, logsToDequeue);
while (logsToDequeue > 0)
//Dequeue a batch of logs
dequeuedLogs = DequeueBatch(Math.Min(_logWriteBatchSize, logsToDequeue));
logsToDequeue -= _logWriteBatchSize;
//As long as we're over the limit for enqueued logs, drop the batch and continue to the next one.
//We have to drop logs to play catch-up to keep up with the production of logs without exceeding allowed memory or blocking sampling of new logs.
if (_logs.Count > MaxEnqueudLogs)
Interlocked.Add(ref _totalLogsThrottled, dequeuedLogs.Count); //TODO: Make this a separate statistic, since it's dropping an entire batch, rather than single logs like Enqueue does
//Reset throttling flag after taking enough batches from the queue to keep things moving.
//Throttling should really only occur while we're waiting on a write and have enqueued the maximum number of logs.
_throttling = false;
//Begin writing logs. Do not allow cancellation of the write itself, but stop waiting if cancellationTokenSource.Token is cancelled.
//NOTE: asyncWriteCallback generally does not throw exceptions; rather, they're organized in the results for different write paths
var writeResult = await asyncWriteCallback(dequeuedLogs, writeCancellationToken).ConfigureAwait(false);
var logsWritten = writeResult.LogsWritten;
//If either writing to database and/or AWS succeeded, count the logs towards successful performance metrics.
var wroteToDatabase = writeResult.WroteToDatabase && writeResult.DatabaseWriteExceptions.None();
var wroteToAWS = writeResult.WroteToAWS && writeResult.AwsWriteExceptions.None();
if (wroteToDatabase || wroteToAWS)
if (wroteToDatabase)
InterlockedExtensions.ExchangeIfGreaterThan(ref _maxDatabaseWriteTime, writeResult.DatabaseWriteTime.Ticks);
if (wroteToAWS)
InterlockedExtensions.ExchangeIfGreaterThan(ref _maxAwsWriteTime, writeResult.AwsWriteTime.Ticks);
//Note that we recieve the number of logs written from asyncWriteCallback instead of using dequeuedLogs.Count,
//because type T sometimes represents a *set* of log entries (for more efficient concurrent enqueuing) rather than individual log entries,
//so the number of dequeuedLogs entries is not necessarily 1-to-1 with number of log rows written.
_perfCounter.Add(writeTimer.Elapsed, logsWritten);
Interlocked.Add(ref _totalLogsWritten, (long)logsWritten); //Note: Only this measure is actual log record written; all others involve enqueued entries.
//If either writing to database and/or AWS was attempted and failed, report the errors in system logs (only first failure per Flush, not for every batch).
var errorWritingLogsToDatabase = writeResult.WroteToDatabase && writeResult.DatabaseWriteExceptions.Any();
var errorWritingLogsToAWS = writeResult.WroteToAWS && writeResult.AwsWriteExceptions.Any();
if (errorWritingLogsToDatabase || errorWritingLogsToAWS)
if (!errorsLogged)
errorsLogged = true;
if (errorWritingLogsToDatabase)
var errorMessage = "Logging failed writing to database: " + new AggregateException(writeResult.DatabaseWriteExceptions).FlattenMessage();
Logging.System.LogException(errorMessage, null, writeResult.DatabaseWriteExceptions.FirstOrDefault()?.StackTrace, "logging", null);
if (errorWritingLogsToAWS)
var errorMessage = "Logging failed writing to AWS: " + new AggregateException(writeResult.AwsWriteExceptions).FlattenMessage();
Logging.System.LogException(errorMessage, null, writeResult.AwsWriteExceptions.FirstOrDefault()?.StackTrace, "logging", null);
throw new Exception("Error writing logs."); //Invoke catch block
catch (Exception ex)
//Reenqueue failed logs unless flagged to dump them or maximum number of allowed failed attempts is reached
Interlocked.Increment(ref _failedWriteAttempts);
Interlocked.Increment(ref _totalFailedWriteAttempts);
var failedWriteAttempts = Interlocked.Read(ref _failedWriteAttempts);
if (failedWriteAttempts >= MaxFailedWriteAttempts)
_dumpOnNextFailure = true;
if (_dumpOnNextFailure)
Interlocked.Add(ref _totalLogsDumped, (long)dequeuedLogs.Count);
LogFailure(dequeuedLogs, ex);
//We do not reenqueue logs on failure anymore; rather, we throttle and drop logs on failure to keep up.
//Also, because we're writing to multiple desinations (database and AWS), we don't want to repeat writes when they succeed on one destionation and fail on the other.
//else if (_logs.Count < MaxEnqueuedLogs)
// ReenqueueBatch(dequeuedLogs);
//Reset dump flag after each cycle
if (_dumpOnNextFailure)
_dumpOnNextFailure = false;
Interlocked.Exchange(ref _failedWriteAttempts, 0L);
//Always cancel throttling on return
_throttling = false;
private void LogFailure(List<T> dequeuedLogs, Exception ex)
//Try to log exception that necessitated reenqueuing logs.
try {
Func<Exception,bool> filterUselessMessages = x => !(string.IsNullOrWhiteSpace(x.Message) || x.Message.EndsWith("inner exception for details."));
var flattenedMessage = ex.FlattenMessage(" --- ", filterUselessMessages);
Logging.System.LogException($"Error writing '{_logType}' logs. Max write attempts exceeded trying to write {dequeuedLogs.Count} dequeued logs. Error message: {flattenedMessage}", DateTime.UtcNow, ex.StackTrace);
} catch (Exception ex2) { }
/// <summary>
/// Gets a snapshot of the current log count and then dequeues that many log entries.
/// </summary>
private List<T> DequeueBatch(int? numberToDequeue = null)
List<T> dequeuedLogs = new List<T>();
//If batch size to dequeue is not provided, dequeue everything
if (!numberToDequeue.HasValue)
numberToDequeue = _logs.Count();
while (!_logs.IsEmpty && dequeuedLogs.Count <= numberToDequeue)
if (_logs.TryDequeue(out T log))
return dequeuedLogs;
/// <summary>
/// Reenqueues a batch of logs, and writes a system log entry indicating a failure to write logs.
/// Includes the number of logs that failed to write and the time taken to reenqueue them.
/// </summary>
private void ReenqueueBatch(List<T> dequeuedLogs)
foreach (var log in dequeuedLogs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment