Created
November 22, 2019 05:43
-
-
Save tnayanam/cf18c76dd05499204c19ae473769048e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using SM.BrokerDomain.DTO.Configuration; | |
using SM.Common.BLL.AppServices.Statistics; | |
using SM.Common.BLL.Helpers; | |
using SM.Common.DTO.Configuration; | |
using SM.Common.DTO.Exceptions; | |
using System; | |
using System.Collections; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace SM.BrokerDomain.BLL | |
{ | |
/// <summary> | |
/// Maintains a concurrent queue of the specified type. | |
/// Provides a method for enqueuing logs, and methods for dequeueing a batch of logs, passing them to a write callback, | |
/// and handling error/load issues, as well as ignored logs when service is stopped. | |
/// </summary> | |
public class ConcurrentLogQueue<T> | |
{ | |
const int PerformanceCounterSamples = 30; | |
const int MaxEnqueuesBetweenThrottlingChecks = 500; | |
[Obsolete("Instead of allowing multiple write attempts on failure, because logs could pile up in memory, we throttle and drop logs to keep up with the production of logs in real-time.")] | |
public static readonly int MaxFailedWriteAttempts = Config.TryGet(ConfigurationKeys.LoggingMaxFailedWriteAttempts, 60); | |
/// <summary> | |
/// Max number of logs we'll allow to queue in memory before throttlng new logs and dropping batches of logs to play catch-up when log flushes cannot keep up with the volume. | |
/// </summary> | |
public static readonly int MaxEnqueudLogs = Config.TryGet(ConfigurationKeys.LoggingMaxEnqueuedLogs, 20000); | |
private ConcurrentQueue<T> _logs; | |
private bool _dumpOnNextFailure; | |
private int logsAdded = 0; | |
private volatile bool _throttling = false; | |
private string _logType; | |
private int _logWriteBatchSize; | |
private bool _enabled = false; | |
#region Statistics | |
private long _failedWriteAttempts; //internal/cyclical, do not reset or return value with statistics | |
private long _totalFailedWriteAttempts; | |
private long _totalLogsWritten; | |
private long _totalLogsDumped; | |
private long _totalLogsThrottled; | |
private long _maxLogsDequeuedAtOnce; //max logs present when Flush was called (max observed in queue at time of flush) | |
private long _maxDatabaseWriteTime; | |
private long _maxAwsWriteTime; | |
private OperationPerformanceCounter _perfCounter; | |
public void ResetStatistics() | |
{ | |
Interlocked.Exchange(ref _totalFailedWriteAttempts, 0L); | |
Interlocked.Exchange(ref _totalLogsWritten, 0L); | |
Interlocked.Exchange(ref _totalLogsDumped, 0L); | |
Interlocked.Exchange(ref _totalLogsThrottled, 0L); | |
Interlocked.Exchange(ref _maxLogsDequeuedAtOnce, 0L); | |
Interlocked.Exchange(ref _maxAwsWriteTime, 0L); | |
Interlocked.Exchange(ref _maxDatabaseWriteTime, 0L); | |
} | |
#endregion | |
public ConcurrentLogQueue(int logWriteBatchSize) | |
{ | |
if (logWriteBatchSize < 100) | |
logWriteBatchSize = 100; //log batch size should never be too low; too much overhead | |
_logs = new ConcurrentQueue<T>(); | |
_dumpOnNextFailure = false; | |
_logType = typeof(T).Name; | |
_logWriteBatchSize = logWriteBatchSize; | |
_failedWriteAttempts = 0; | |
_totalLogsDumped = 0; | |
_totalLogsThrottled = 0; | |
_maxLogsDequeuedAtOnce = 0; | |
_maxDatabaseWriteTime = 0; | |
_maxAwsWriteTime = 0; | |
_perfCounter = new OperationPerformanceCounter(PerformanceCounterSamples); | |
} | |
public bool Enabled | |
{ | |
get => _enabled; | |
set { | |
_enabled = value; | |
if (!_enabled) | |
ResetStatistics(); | |
} | |
} | |
public bool IsEmpty => _logs.IsEmpty; | |
public int LogWriteBatchSize => _logWriteBatchSize; | |
public long TotalLogsDumped => Interlocked.Read(ref _totalLogsDumped); | |
public long TotalLogsThrottled => Interlocked.Read(ref _totalLogsThrottled); | |
public long TotalFailedWriteAttempts => Interlocked.Read(ref _totalFailedWriteAttempts); | |
public long TotalLogsWritten => Interlocked.Read(ref _totalLogsWritten); | |
public TimeSpan MaxDatabaseWriteTime => TimeSpan.FromTicks(Interlocked.Read(ref _maxDatabaseWriteTime)); | |
public TimeSpan MaxAwsWriteTime => TimeSpan.FromTicks(Interlocked.Read(ref _maxAwsWriteTime)); | |
public bool Throttling => _throttling; | |
public int LogsEnqueued => _logs.Count; | |
public double LogsPerSecondBurst => _perfCounter.OperationsPerSecondBurst; | |
public double LogsPerSecondRealTime => _perfCounter.OperationsPerSecondRealTime; | |
public double MaxLogsDequeuedAtOnce => _maxLogsDequeuedAtOnce; | |
public void Enqueue(T log) | |
{ | |
//When not enabled (i.e. when the logging service is off) we do not retain logs. | |
if (Enabled) | |
{ | |
//Note: When enforcing upper limit, we check _logs.Count only every 1000 enqueues | |
//because it may not perform well under high load: https://github.com/dotnet/corefx/issues/29759 | |
if (Interlocked.Increment(ref logsAdded) >= MaxEnqueuesBetweenThrottlingChecks) | |
{ | |
//Reset "log Count" sampling counter | |
Interlocked.Exchange(ref logsAdded, 0); | |
//Update throttling; always set value here to ensure it cannot get stuck on as long as there are items being added to the queue. | |
_throttling = _logs.Count > MaxEnqueudLogs; | |
} | |
if (_throttling) | |
Interlocked.Increment(ref _totalLogsThrottled); | |
else | |
_logs.Enqueue(log); | |
} | |
} | |
/// <summary> | |
/// Dequeues and discards all currently queue logs. | |
/// </summary> | |
public void Dump() | |
{ | |
DequeueBatch(); | |
} | |
[Obsolete("Code was updated to drop logs to keep up with their creation, so they are dropped immediately on failure to write.")] | |
public void DumpOnNextFailure() | |
{ | |
_dumpOnNextFailure = true; | |
} | |
public async Task FlushAsync(CancellationToken writeCancellationToken, Func<List<T>,CancellationToken,Task<BulkLogWriter.BulkLogWriterResult>> asyncWriteCallback) | |
{ | |
var writeTimer = new Stopwatch(); | |
List<T> dequeuedLogs; | |
bool errorsLogged = false; //Ensure errors are logged once per flush, not once per batch | |
//To honor "Flush" semantics, we must commit to writing only a finite number of logs -- what's currently in the queue -- and then return. | |
int logsToDequeue = _logs.Count; | |
InterlockedExtensions.ExchangeIfGreaterThan(ref _maxLogsDequeuedAtOnce, logsToDequeue); | |
while (logsToDequeue > 0) | |
{ | |
//Dequeue a batch of logs | |
dequeuedLogs = DequeueBatch(Math.Min(_logWriteBatchSize, logsToDequeue)); | |
logsToDequeue -= _logWriteBatchSize; | |
//As long as we're over the limit for enqueued logs, drop the batch and continue to the next one. | |
//We have to drop logs to play catch-up to keep up with the production of logs without exceeding allowed memory or blocking sampling of new logs. | |
if (_logs.Count > MaxEnqueudLogs) | |
{ | |
Interlocked.Add(ref _totalLogsThrottled, dequeuedLogs.Count); //TODO: Make this a separate statistic, since it's dropping an entire batch, rather than single logs like Enqueue does | |
continue; | |
} | |
//Reset throttling flag after taking enough batches from the queue to keep things moving. | |
//Throttling should really only occur while we're waiting on a write and have enqueued the maximum number of logs. | |
_throttling = false; | |
try | |
{ | |
//Begin writing logs. Do not allow cancellation of the write itself, but stop waiting if cancellationTokenSource.Token is cancelled. | |
writeTimer.Restart(); | |
//NOTE: asyncWriteCallback generally does not throw exceptions; rather, they're organized in the results for different write paths | |
var writeResult = await asyncWriteCallback(dequeuedLogs, writeCancellationToken).ConfigureAwait(false); | |
var logsWritten = writeResult.LogsWritten; | |
//If either writing to database and/or AWS succeeded, count the logs towards successful performance metrics. | |
var wroteToDatabase = writeResult.WroteToDatabase && writeResult.DatabaseWriteExceptions.None(); | |
var wroteToAWS = writeResult.WroteToAWS && writeResult.AwsWriteExceptions.None(); | |
if (wroteToDatabase || wroteToAWS) | |
{ | |
if (wroteToDatabase) | |
InterlockedExtensions.ExchangeIfGreaterThan(ref _maxDatabaseWriteTime, writeResult.DatabaseWriteTime.Ticks); | |
if (wroteToAWS) | |
InterlockedExtensions.ExchangeIfGreaterThan(ref _maxAwsWriteTime, writeResult.AwsWriteTime.Ticks); | |
//Note that we recieve the number of logs written from asyncWriteCallback instead of using dequeuedLogs.Count, | |
//because type T sometimes represents a *set* of log entries (for more efficient concurrent enqueuing) rather than individual log entries, | |
//so the number of dequeuedLogs entries is not necessarily 1-to-1 with number of log rows written. | |
_perfCounter.Add(writeTimer.Elapsed, logsWritten); | |
Interlocked.Add(ref _totalLogsWritten, (long)logsWritten); //Note: Only this measure is actual log record written; all others involve enqueued entries. | |
} | |
//If either writing to database and/or AWS was attempted and failed, report the errors in system logs (only first failure per Flush, not for every batch). | |
var errorWritingLogsToDatabase = writeResult.WroteToDatabase && writeResult.DatabaseWriteExceptions.Any(); | |
var errorWritingLogsToAWS = writeResult.WroteToAWS && writeResult.AwsWriteExceptions.Any(); | |
if (errorWritingLogsToDatabase || errorWritingLogsToAWS) | |
{ | |
if (!errorsLogged) | |
{ | |
errorsLogged = true; | |
if (errorWritingLogsToDatabase) | |
{ | |
var errorMessage = "Logging failed writing to database: " + new AggregateException(writeResult.DatabaseWriteExceptions).FlattenMessage(); | |
Logging.System.LogException(errorMessage, null, writeResult.DatabaseWriteExceptions.FirstOrDefault()?.StackTrace, "logging", null); | |
} | |
if (errorWritingLogsToAWS) | |
{ | |
var errorMessage = "Logging failed writing to AWS: " + new AggregateException(writeResult.AwsWriteExceptions).FlattenMessage(); | |
Logging.System.LogException(errorMessage, null, writeResult.AwsWriteExceptions.FirstOrDefault()?.StackTrace, "logging", null); | |
} | |
} | |
throw new Exception("Error writing logs."); //Invoke catch block | |
} | |
} | |
catch (Exception ex) | |
{ | |
//Reenqueue failed logs unless flagged to dump them or maximum number of allowed failed attempts is reached | |
Interlocked.Increment(ref _failedWriteAttempts); | |
Interlocked.Increment(ref _totalFailedWriteAttempts); | |
var failedWriteAttempts = Interlocked.Read(ref _failedWriteAttempts); | |
if (failedWriteAttempts >= MaxFailedWriteAttempts) | |
_dumpOnNextFailure = true; | |
if (_dumpOnNextFailure) | |
{ | |
Interlocked.Add(ref _totalLogsDumped, (long)dequeuedLogs.Count); | |
LogFailure(dequeuedLogs, ex); | |
} | |
//We do not reenqueue logs on failure anymore; rather, we throttle and drop logs on failure to keep up. | |
//Also, because we're writing to multiple desinations (database and AWS), we don't want to repeat writes when they succeed on one destionation and fail on the other. | |
//else if (_logs.Count < MaxEnqueuedLogs) | |
// ReenqueueBatch(dequeuedLogs); | |
} | |
//Reset dump flag after each cycle | |
if (_dumpOnNextFailure) | |
{ | |
_dumpOnNextFailure = false; | |
Interlocked.Exchange(ref _failedWriteAttempts, 0L); | |
} | |
} | |
//Always cancel throttling on return | |
_throttling = false; | |
} | |
private void LogFailure(List<T> dequeuedLogs, Exception ex) | |
{ | |
//Try to log exception that necessitated reenqueuing logs. | |
try { | |
Func<Exception,bool> filterUselessMessages = x => !(string.IsNullOrWhiteSpace(x.Message) || x.Message.EndsWith("inner exception for details.")); | |
var flattenedMessage = ex.FlattenMessage(" --- ", filterUselessMessages); | |
Logging.System.LogException($"Error writing '{_logType}' logs. Max write attempts exceeded trying to write {dequeuedLogs.Count} dequeued logs. Error message: {flattenedMessage}", DateTime.UtcNow, ex.StackTrace); | |
} catch (Exception ex2) { } | |
} | |
/// <summary> | |
/// Gets a snapshot of the current log count and then dequeues that many log entries. | |
/// </summary> | |
private List<T> DequeueBatch(int? numberToDequeue = null) | |
{ | |
List<T> dequeuedLogs = new List<T>(); | |
//If batch size to dequeue is not provided, dequeue everything | |
if (!numberToDequeue.HasValue) | |
numberToDequeue = _logs.Count(); | |
while (!_logs.IsEmpty && dequeuedLogs.Count <= numberToDequeue) | |
{ | |
if (_logs.TryDequeue(out T log)) | |
dequeuedLogs.Add(log); | |
} | |
return dequeuedLogs; | |
} | |
/// <summary> | |
/// Reenqueues a batch of logs, and writes a system log entry indicating a failure to write logs. | |
/// Includes the number of logs that failed to write and the time taken to reenqueue them. | |
/// </summary> | |
private void ReenqueueBatch(List<T> dequeuedLogs) | |
{ | |
foreach (var log in dequeuedLogs) | |
_logs.Enqueue(log); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment