Created
November 21, 2019 20:20
-
-
Save tnayanam/da12cd4695e8aeaecb216f7fdc67b524 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace SM.BrokerDomain.BLL | |
{ | |
/// <summary> | |
/// Performs bulk-writes of logs to the database. | |
/// Do not use this class directly for logging. Instead, use the FederationLogger singleton for logging. | |
/// </summary> | |
public class BulkLogWriter | |
{ | |
private const string SEND_LOGS = "SM.Logging.FederationLogs.SendLogsToAWS.Enabled"; | |
private class LogTypeInfo | |
{ | |
public string TableName { get; set; } | |
public Type DataReaderType { get; set; } | |
public LogTypeInfo(string tableName, Type dataReaderType) { TableName = tableName; DataReaderType = dataReaderType; } | |
} | |
private static readonly Dictionary<Type, LogTypeInfo> logTypeInfos = new Dictionary<Type, LogTypeInfo>() { | |
{ typeof(FederationAuditLog), new LogTypeInfo("[Log].[FederationAuditLogs]", typeof(FederationAuditLogDataReader)) }, | |
{ typeof(FederationTraceLog), new LogTypeInfo("[Log].[FederationTraceLogs]", typeof(FederationTraceLogDataReader)) }, | |
{ typeof(PublishRestSucessLog), new LogTypeInfo("[Log].[PublishRestSucessLogs]", typeof(PublishRestSuccessLogDataReader)) }, | |
{ typeof(PublishRestErrorLog), new LogTypeInfo("[Log].[PublishRestErrorLogs]", typeof(PublishRestErrorLogDataReader)) }, | |
{ typeof(EcomRequestLog), new LogTypeInfo("[Log].[EcomRequestLogs]", typeof(EcomRequestLogDataReader)) }, | |
{ typeof(SystemLog), new LogTypeInfo("[Broker].[SystemLogs]", typeof(SystemLogDataReader)) } | |
}; | |
/// <summary> | |
/// Bulk-writes a set of log entities to the database. Must be a supported log entry type with a mapping in the logTypeInfos dictionary. | |
/// </summary> | |
public static async Task LogAsync<TLog>(IEnumerable<TLog> logs, CancellationToken cancellationToken, int batchSize = 1000, int timeoutSeconds = 180) | |
{ | |
using (var logsDbContext = RepoCache.CreateContextFor<TLog>()) | |
{ | |
var logTypeInfo = logTypeInfos[typeof(TLog)]; | |
await logsDbContext.BulkInsertAsync(logTypeInfo.TableName, () => (BulkInsertDataReader<TLog>)Activator.CreateInstance(logTypeInfo.DataReaderType, logs), (batchSize < 1) ? logs.Count() : batchSize, timeoutSeconds, cancellationToken).ConfigureAwait(false); | |
} | |
try | |
{ | |
var sendLogsToAWS = Config.TryGet(SEND_LOGS, false); | |
if (sendLogsToAWS) | |
await SendLogsToCloudWatch(logs).ConfigureAwait(false); | |
} | |
catch {} // Eat it. | |
} | |
#region CloudWatchLogs | |
private static AmazonCloudWatchLogsClient _logClient; | |
public static AmazonCloudWatchLogsClient logClient => _logClient ?? (_logClient = new AmazonCloudWatchLogsClient(new ECSTaskCredentials(), new EnvironmentVariableAWSRegion().Region)); | |
private const string LOG_GROUP_NOTFOUND = "The specified log group does not exist."; | |
private const int CLOUDWATCH_MAX_PAYLOAD_COUNT = 9000; // Actual Limit - 10000 | |
private const int CLOUDWATCH_MAX_PAYLOAD_SIZE = 1048000; // Actual Limit - 1048576 | |
private const int CLOUDWATCH_CONSTRAINT = 26; | |
private const int CLOUDWATCH_MAX_LOG_SIZE = 261900 - CLOUDWATCH_CONSTRAINT; // Actual Limit - 262144 | |
/// <summary> | |
/// Holds the settings related to each type of logs (trace, audit, system, publish) | |
/// </summary> | |
private class AWSLogSetting | |
{ | |
public string LogStreamName { get; set; } | |
public string SequenceToken { get; set; } | |
public int RetryCount { get; set; } | |
public Func<object, string> ConversionFunction { get; set; } | |
} | |
private static readonly Dictionary<Type, AWSLogSetting> awsLogTypeSettings = new Dictionary<Type, AWSLogSetting>() { | |
{ typeof(FederationAuditLog), new AWSLogSetting { | |
LogStreamName = FederationLogger.auditStreamName, | |
RetryCount = FederationLogger.auditRetryCount , | |
SequenceToken = FederationLogger.auditSequenceToken , | |
ConversionFunction = (log) => ConvertToAudit((FederationAuditLog)log) } }, | |
{ typeof(FederationTraceLog), new AWSLogSetting { | |
LogStreamName = FederationLogger.traceStreamName, | |
RetryCount = FederationLogger.traceRetryCount, | |
SequenceToken = FederationLogger.traceSequenceToken, | |
ConversionFunction = (log) => ConvertToTrace((FederationTraceLog)log) } }, | |
{ typeof(SystemLog), new AWSLogSetting { | |
LogStreamName = SystemLogger.systemStreamName, | |
RetryCount = SystemLogger.systemRetryCount, | |
SequenceToken = SystemLogger.systemSequenceToken , | |
ConversionFunction = (log) => ConvertToSystem((SystemLog)log) } }, | |
{ typeof(PublishRestSucessLog), new AWSLogSetting { | |
LogStreamName = PublishingLogger.publishSuccessStreamName, | |
RetryCount = PublishingLogger.publishSucessRetryCount, | |
SequenceToken = PublishingLogger.publishSuccessSequenceToken , | |
ConversionFunction = (log) => ConvertPubSuccToAudit((PublishRestSucessLog)log) } }, | |
{ typeof(PublishRestErrorLog), new AWSLogSetting { | |
LogStreamName = PublishingLogger.publishErrorStreamName, | |
RetryCount = PublishingLogger.publishErrorRetryCount, | |
SequenceToken = PublishingLogger.publishErrorSequenceToken, | |
ConversionFunction = (log) => ConvertPubErrToAudit((PublishRestErrorLog)log) } }, | |
}; | |
/// <summary> | |
/// Takes list of log and sends them in batch to Cloud Watch Logs | |
/// If the Log Group is not present in the CWL (which is read from web.config), exception will occur | |
/// If the Log Stream is not present it will be created in CWL. | |
/// If the Size of logs is > CLOUDWATCH_MAX_PAYLOAD_SIZE, the logs will be dumped | |
/// If the count of logs in the payload is > CLOUDWATCH_MAX_PAYLOAD_COUNT, old batch will be sent and new batch will be created | |
/// Every time batch is sent an "Upload Sequence Token" is returned and that is needed for next batch to be sent succesfully | |
/// If "Upload Sequence Token" doesn't match an exception will be thrown | |
/// When Resource not found exception is thrown because stream is not found a new stream is created and the batch of log messages are sent to it. | |
/// </summary> | |
/// <typeparam name="TLog"></typeparam> | |
/// <param name="logs"> Trace, Audit, System, Ecom and Publish Logs</param> | |
/// <param name="logCount"> Number of logs/messages that are currently proccesed</param> | |
/// <param name="logBatch"> Batch of logs/messages which needs to be resent coz of exception</param> | |
/// <returns></returns> | |
public static async Task SendLogsToCloudWatch<TLog>(IEnumerable<TLog> logs, int logCount = 0, List<InputLogEvent> logBatch = null) | |
{ | |
if (!logs.Any()) | |
return; | |
// read the setting from dictionary | |
var logTypeSettings = awsLogTypeSettings[typeof(TLog)]; | |
var i = logCount; | |
if (logBatch == null) | |
logBatch = new List<InputLogEvent>(); | |
try | |
{ | |
var logGroup = Environment.GetEnvironmentVariable("LOG_GROUP"); | |
int bufferSize = 0; | |
// create the stream | |
if (logTypeSettings.SequenceToken == null) | |
await logClient.CreateLogStreamAsync(new CreateLogStreamRequest { LogGroupName = logGroup, LogStreamName = logTypeSettings.LogStreamName }).ConfigureAwait(false); | |
// if there is any pending log batch send it due to exception/retries | |
if (logBatch.Count() > 0) | |
{ | |
var response = await SendLogBatchAsync(logBatch, logGroup, logTypeSettings.LogStreamName, logTypeSettings.SequenceToken).ConfigureAwait(false); | |
logTypeSettings.SequenceToken = response.NextSequenceToken; | |
logBatch.Clear(); | |
} | |
for (i = logCount; i < logs.Count(); i++) | |
{ | |
InputLogEvent logEvent = new InputLogEvent | |
{ | |
Message = logTypeSettings.ConversionFunction(logs.ElementAt(i)), | |
Timestamp = DateTime.Now | |
}; | |
var msgSize = Encoding.UTF8.GetByteCount(logEvent.Message); | |
// if message size is more than the allowed limit truncate the log | |
if(msgSize > CLOUDWATCH_MAX_LOG_SIZE) | |
{ | |
logEvent.Message = logEvent.Message.Substring(0, CLOUDWATCH_MAX_LOG_SIZE); | |
msgSize = CLOUDWATCH_MAX_LOG_SIZE; | |
} | |
// getting actual event payload log size by adding default size appended by Cloud Watch | |
msgSize += CLOUDWATCH_CONSTRAINT; | |
bufferSize += msgSize; | |
if (logBatch.Count < CLOUDWATCH_MAX_PAYLOAD_COUNT && bufferSize <= CLOUDWATCH_MAX_PAYLOAD_SIZE) | |
logBatch.Add(logEvent); | |
else | |
{ | |
if (logBatch.Count > 0) | |
{ | |
var response = await SendLogBatchAsync(logBatch, logGroup, logTypeSettings.LogStreamName, logTypeSettings.SequenceToken).ConfigureAwait(false); | |
logTypeSettings.SequenceToken = response.NextSequenceToken; | |
logBatch.Clear(); | |
} | |
logBatch.Add(logEvent); | |
bufferSize = msgSize; | |
} | |
} | |
if (logBatch.Count > 0) | |
{ | |
var response = await SendLogBatchAsync(logBatch, logGroup, logTypeSettings.LogStreamName, logTypeSettings.SequenceToken).ConfigureAwait(false); | |
logTypeSettings.SequenceToken = response.NextSequenceToken; | |
} | |
} | |
catch (ResourceNotFoundException ex) | |
{ | |
Logging.System.LogException(ex.FlattenMessage(), DateTime.UtcNow, ex.StackTrace); | |
// if log group does not exist then just supress, if log stream does not exist - retry | |
if (ex.Message != LOG_GROUP_NOTFOUND) | |
{ | |
logTypeSettings.SequenceToken = null; | |
logTypeSettings.LogStreamName = Guid.NewGuid().ToString(); | |
if (logTypeSettings.RetryCount > 0) | |
{ | |
logTypeSettings.RetryCount--; | |
// resend the batch that threw exception | |
await SendLogsToCloudWatch(logs, i, logBatch).ConfigureAwait(false); | |
} | |
logTypeSettings.RetryCount = 2; // reset the retry count and proceed. | |
} | |
} | |
catch (Exception ex) | |
{ | |
// Eat it. | |
Logging.System.LogException(ex.FlattenMessage(), DateTime.UtcNow, ex.StackTrace); | |
} | |
} | |
/// <summary> | |
/// Helper Method to send the log batch | |
/// </summary> | |
/// <param name="logBatch"> One batch containing many logs</param> | |
/// <param name="logGroup"> Location/Folder in Cloud Watch Logs where logBatch will be sent</param> | |
/// <param name="logStreamName"> Sub Folder in Cloud Watch Logs whers logBatch will be sent</param> | |
/// <param name="token"> Sequence Token is required for Cloud Watch Logs to place the logs</param> | |
/// <returns></returns> | |
public static Task<PutLogEventsResponse> SendLogBatchAsync(List<InputLogEvent> logBatch, string logGroup, string logStreamName, string token) | |
{ | |
PutLogEventsRequest req = new PutLogEventsRequest | |
{ | |
LogEvents = logBatch, | |
LogGroupName = logGroup, | |
LogStreamName = logStreamName, | |
SequenceToken = token | |
}; | |
return logClient.PutLogEventsAsync(req); | |
} | |
public static string ConvertToAudit(FederationAuditLog auditLog) | |
{ | |
FederationAuditAWSLogDTO audit = new FederationAuditAWSLogDTO | |
{ | |
AuditID = auditLog.ID, | |
Domain = auditLog.Domain, | |
Status = auditLog.Status, | |
BusinessID = auditLog.RetailerID, | |
Event = auditLog.Event, | |
AuthorizedIntegratorID = auditLog.AuthorizedIntegratorID, | |
Request = auditLog.Request, | |
Response = auditLog.Response, | |
Started = auditLog.Started, | |
Completed = auditLog.Completed | |
}; | |
return JsonHelper.SerializeObject(audit, true); | |
} | |
public static string ConvertToTrace(FederationTraceLog traceLog) | |
{ | |
FederationTraceAWSLogDTO trace = new FederationTraceAWSLogDTO | |
{ | |
ID = traceLog.ID, | |
Domain = traceLog.Domain, | |
Event = traceLog.Event, | |
EntityID = traceLog.EntityID, | |
ScopeObserverID = traceLog.ScopeObserverID, | |
Duration = traceLog.Duration, | |
LogLevel = traceLog.LogLevel, | |
Message = traceLog.Message, | |
StackTrace = traceLog.StackTrace, | |
LogType = traceLog.LogType, | |
TimeOfOccurrence = traceLog.TimeOfOccurrence, | |
AuditID = traceLog.FederationAuditLogID | |
}; | |
return JsonHelper.SerializeObject(trace, true); | |
} | |
public static string ConvertToSystem(SystemLog sysLog) | |
{ | |
SystemAWSLogDTO sys = new SystemAWSLogDTO | |
{ | |
ID = sysLog.ID, | |
Duration = sysLog.Duration, | |
LogLevel = sysLog.LogLevel, | |
Message = sysLog.Message, | |
StackTrace = sysLog.StackTrace, | |
LogType = sysLog.LogType, | |
TimeOfOccurrence = sysLog.TimeOfOccurrence, | |
Domain = sysLog.Domain | |
}; | |
return JsonHelper.SerializeObject(sys, true); | |
} | |
public static string ConvertPubSuccToAudit(PublishRestSucessLog restSuccessLog) | |
{ | |
FederationAuditAWSLogDTO restSuccessAudit = new FederationAuditAWSLogDTO | |
{ | |
AuditID = restSuccessLog.ID, | |
Domain = restSuccessLog.Domain, | |
Status = restSuccessLog.StatusCode, | |
BusinessID = restSuccessLog.RetailerID, | |
Event = restSuccessLog.Event, | |
AuthorizedIntegratorID = restSuccessLog.AuthorizedIntegratorID, | |
Request = JsonHelper.SerializeObject(new | |
{ | |
Path = restSuccessLog.Path, | |
Headers = restSuccessLog.Headers, | |
Url = restSuccessLog.Url, | |
Payload = restSuccessLog.Payload | |
}, true), | |
Response = restSuccessLog.Message, | |
Started = restSuccessLog.TimeOfOccurence, | |
Completed = restSuccessLog.TimeOfOccurence | |
}; | |
return JsonHelper.SerializeObject(restSuccessAudit, true); | |
} | |
public static string ConvertPubErrToAudit(PublishRestErrorLog restErrLog) | |
{ | |
FederationAuditAWSLogDTO restErrAudit = new FederationAuditAWSLogDTO | |
{ | |
AuditID = restErrLog.ID, | |
Domain = restErrLog.Domain, | |
Status = restErrLog.StatusCode, | |
BusinessID = restErrLog.RetailerID, | |
Event = restErrLog.Event, | |
AuthorizedIntegratorID = restErrLog.AuthorizedIntegratorID, | |
Request = JsonHelper.SerializeObject(new | |
{ | |
Path = restErrLog.Path, | |
Headers = restErrLog.Headers, | |
Url = restErrLog.Url, | |
Payload = restErrLog.Payload | |
}, true), | |
Response = restErrLog.Message, | |
Started = restErrLog.TimeOfOccurence, | |
Completed = restErrLog.TimeOfOccurence | |
}; | |
return JsonHelper.SerializeObject(restErrAudit, true); | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment