Skip to content

Instantly share code, notes, and snippets.

@recalde
Created November 8, 2024 14:35
Show Gist options
  • Select an option

  • Save recalde/f563de6df2b2b6bc22350a6c828b0669 to your computer and use it in GitHub Desktop.

Select an option

Save recalde/f563de6df2b2b6bc22350a6c828b0669 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Timers;
using ParquetSharp;
public class LogAggregator<TLog> : IDisposable where TLog : class
{
private readonly List<TLog> _records = new();
private readonly int _threshold;
private readonly TimeSpan _timeSpan;
private readonly Timer _timer;
private readonly PropertyInfo[] _properties;
private readonly ParquetFileWriter _fileWriter;
private DateTime? _mostRecentTimestamp;
private readonly object _lock = new(); // Lock object for thread safety
public LogAggregator(int threshold, TimeSpan timeSpan, string logPrefix)
{
_threshold = threshold;
_timeSpan = timeSpan;
// Retrieve and order properties of TLog with Timestamp and Host first if they exist
_properties = typeof(TLog).GetProperties()
.OrderBy(p => p.Name == "Timestamp" ? 0 : p.Name == "Host" ? 1 : 2)
.ToArray();
// Initialize a timer to trigger writing to disk periodically
_timer = new Timer(_timeSpan.TotalMilliseconds);
_timer.Elapsed += (sender, args) => WriteToDisk();
_timer.Start();
// Initialize a Parquet file with a timestamp-based name and prefix
string fileName = $"{logPrefix}_{DateTime.Now:yyyyMMddHH}.parquet";
_fileWriter = new ParquetFileWriter(fileName, _properties.Select(CreateColumn).ToArray());
// Add metadata if Host and Method properties exist
AddMetadataIfAvailable();
}
public DateTime? MostRecentTimestamp => _mostRecentTimestamp;
public void AddRange(IEnumerable<TLog> newRecords)
{
var recordsList = newRecords.ToList();
// Update the most recent timestamp based on the new records
var latestTimestamp = recordsList.Max(r =>
{
var timestamp = (DateTime)typeof(TLog).GetProperty("Timestamp").GetValue(r);
return timestamp;
});
_mostRecentTimestamp = _mostRecentTimestamp.HasValue
? (_mostRecentTimestamp > latestTimestamp ? _mostRecentTimestamp : latestTimestamp)
: latestTimestamp;
_records.AddRange(recordsList);
_records.Sort((x, y) =>
{
var timestampX = (DateTime)typeof(TLog).GetProperty("Timestamp").GetValue(x);
var timestampY = (DateTime)typeof(TLog).GetProperty("Timestamp").GetValue(y);
return timestampX.CompareTo(timestampY);
});
if (_records.Count >= _threshold)
{
WriteToDisk();
}
}
private void WriteToDisk()
{
// Use a lock to prevent simultaneous writes
lock (_lock)
{
if (_records.Count == 0) return;
// Append a new row group for this batch
using var rowGroupWriter = _fileWriter.AppendRowGroup();
foreach (var property in _properties)
{
WriteColumn(rowGroupWriter, property);
}
// Clear the records after writing to disk
_records.Clear();
// Reset the timer to extend the next write
_timer.Stop();
_timer.Start();
}
}
private Column CreateColumn(PropertyInfo property)
{
return property.PropertyType switch
{
Type t when t == typeof(DateTime) => new Column<DateTime>(property.Name),
Type t when t == typeof(string) => new Column<string>(property.Name),
Type t when t == typeof(Guid) => new Column<Guid>(property.Name),
Type t when t == typeof(long) => new Column<long>(property.Name),
Type t when t == typeof(int) => new Column<int>(property.Name),
_ => throw new NotSupportedException($"Property type {property.PropertyType} is not supported")
};
}
private void WriteColumn(RowGroupWriter rowGroupWriter, PropertyInfo property)
{
var values = _records.Select(r => property.GetValue(r)).ToArray();
using var columnWriter = rowGroupWriter.NextColumn().LogicalWriter<object>();
columnWriter.WriteBatch(values);
}
private void AddMetadataIfAvailable()
{
var hostProperty = _properties.FirstOrDefault(p => p.Name == "Host");
var methodProperty = _properties.FirstOrDefault(p => p.Name == "Method");
if (hostProperty != null)
{
_fileWriter.FileMetaData.KeyValueMetadata = _fileWriter.FileMetaData.KeyValueMetadata
.Append(new KeyValue("Host", "Variable")).ToArray();
}
if (methodProperty != null)
{
_fileWriter.FileMetaData.KeyValueMetadata = _fileWriter.FileMetaData.KeyValueMetadata
.Append(new KeyValue("Method", "Variable")).ToArray();
}
}
public void Dispose()
{
_timer.Stop();
_timer.Dispose();
// Write any remaining records to disk
WriteToDisk();
// Close the file writer to finalize the Parquet file
_fileWriter.Close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment