Last active
February 16, 2022 06:31
-
-
Save itn3000/ba4ca3474eb4527d502e8e1606f143ec to your computer and use it in GitHub Desktop.
opentelemetry-dotnet FasterLog exporter(test implementation)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using OpenTelemetry.Trace; | |
| using OpenTelemetry.Resources; | |
| using OpenTelemetry; | |
| using FASTER.core; | |
| using System.Text.Json; | |
| using System.Buffers; | |
| using System.Diagnostics; | |
| using OpenTelemetry.Metrics; | |
| namespace opentelemetryfltest | |
| { | |
| class FasterExporter<T> : BaseExporter<T> where T : class | |
| { | |
| record struct FasterLogRecord(T Data, IEnumerable<KeyValuePair<string, object>> Attributes); | |
| FasterLog _LogStorage; | |
| public FasterExporter(FasterLog storage) | |
| { | |
| _LogStorage = storage; | |
| } | |
| public override ExportResult Export(in Batch<T> batch) | |
| { | |
| var resource = this.ParentProvider.GetResource(); | |
| long lastEnqueueId = -1; | |
| foreach (var item in batch) | |
| { | |
| // stored as utf-8 json string | |
| using var mstm = new MemoryStream(); | |
| JsonSerializer.Serialize(mstm, new FasterLogRecord(item, resource != Resource.Empty ? resource.Attributes : Array.Empty<KeyValuePair<string, object>>())); | |
| lastEnqueueId = _LogStorage.Enqueue(mstm.ToArray()); | |
| } | |
| if (lastEnqueueId >= 0) | |
| { | |
| _LogStorage.Commit(); | |
| } | |
| return ExportResult.Success; | |
| } | |
| } | |
| static class TraceProviderExtensions | |
| { | |
| public static TracerProviderBuilder AddFasterLogExporter(this TracerProviderBuilder builder, FasterLog logStorage) | |
| { | |
| return builder.AddProcessor(new SimpleActivityExportProcessor(new FasterExporter<Activity>(logStorage))); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using OpenTelemetry.Trace; | |
| using OpenTelemetry.Resources; | |
| using OpenTelemetry; | |
| using FASTER.core; | |
| using System.Text.Json; | |
| using System.Buffers; | |
| using System.Diagnostics; | |
| using System.Runtime.CompilerServices; | |
| using OpenTelemetry.Metrics; | |
| namespace opentelemetryfltest; | |
| public class MetricRecordBase | |
| { | |
| public string Name { get; } | |
| public string MeterName { get; } | |
| public string Description { get; } | |
| public string Unit { get; } | |
| public KeyValuePair<string, object?>[] Attributes { get; } | |
| public MetricRecordBase(Metric metric, KeyValuePair<string, object?>[] attributes) | |
| { | |
| Name = metric.Name; | |
| MeterName = metric.Meter.Name; | |
| Description = metric.Description; | |
| Unit = metric.Unit; | |
| Attributes = attributes; | |
| } | |
| } | |
| public class HistogramMetricRecord : MetricRecordBase | |
| { | |
| public HistogramMetricRecord(Metric metric, KeyValuePair<string, object?>[] attributes) : base(metric, attributes) | |
| { } | |
| public List<HistogramMetricPoint> Points { get; } = new List<HistogramMetricPoint>(); | |
| } | |
| public class TagElement | |
| { | |
| public string Key { get; } | |
| public string? Value { get; } | |
| public TagElement(string key, string? value) | |
| { | |
| Key = key; | |
| Value = value; | |
| } | |
| } | |
| public class MetricPointRecordBase | |
| { | |
| public DateTimeOffset StartTime { get; } | |
| public DateTimeOffset EndTime { get; } | |
| public TagElement[] Tags { get; } | |
| public MetricPointRecordBase() | |
| { | |
| StartTime = DateTimeOffset.MinValue; | |
| EndTime = DateTimeOffset.MinValue; | |
| Tags = Array.Empty<TagElement>(); | |
| } | |
| public MetricPointRecordBase(in MetricPoint point) | |
| { | |
| StartTime = point.StartTime; | |
| EndTime = point.EndTime; | |
| Tags = new TagElement[point.Tags.Count]; | |
| int count = 0; | |
| foreach(var item in point.Tags) | |
| { | |
| Tags[count] = new TagElement(item.Key, item.Value != null ? item.Value.ToString() : null); | |
| // Tags[item.Key] = item.Value != null ? item.Value.ToString() : null; | |
| } | |
| } | |
| } | |
| public class HistogramMetricPoint : MetricPointRecordBase | |
| { | |
| public HistogramMetricPoint(): base() | |
| { | |
| Count = 0; | |
| Sum = 0; | |
| } | |
| public HistogramMetricPoint(in MetricPoint point, long count, double sum) : base(in point) | |
| { | |
| Count = count; | |
| Sum = sum; | |
| } | |
| public long Count { get; } | |
| public double Sum { get; } | |
| public List<HistogramBucket> Buckets = new List<HistogramBucket>(); | |
| } | |
| public class SumMetricRecord<T> : MetricRecordBase where T : struct | |
| { | |
| public SumMetricRecord(Metric metric, KeyValuePair<string, object?>[] attributes) : base(metric, attributes) | |
| { } | |
| public List<SumMetricPoint<T>> Sums { get; } = new(); | |
| } | |
| public class SumMetricPoint<T> : MetricPointRecordBase where T : struct | |
| { | |
| public SumMetricPoint(): base() | |
| { | |
| Sum = default; | |
| } | |
| public SumMetricPoint(in MetricPoint point, T sum) : base(point) | |
| { | |
| Sum = sum; | |
| } | |
| public T Sum { get; } | |
| } | |
| public class GaugeMetricRecord<T> : MetricRecordBase where T : struct | |
| { | |
| public GaugeMetricRecord(Metric metric, KeyValuePair<string, object?>[] attributes) : base(metric, attributes) | |
| { | |
| } | |
| public List<GaugeMetricPoint<T>> LastValues = new(); | |
| } | |
| public class GaugeMetricPoint<T> : MetricPointRecordBase where T : struct | |
| { | |
| public GaugeMetricPoint(): base() | |
| { | |
| LastValue = default; | |
| } | |
| public GaugeMetricPoint(in MetricPoint point, T lastValue) : base(point) | |
| { | |
| LastValue = lastValue; | |
| } | |
| public T LastValue; | |
| } | |
| class FasterLogMetricExporter : BaseExporter<Metric> | |
| { | |
| FasterLog _LogStorage; | |
| public FasterLogMetricExporter(FasterLog storage) | |
| { | |
| _LogStorage = storage; | |
| } | |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | |
| long EnqueueRecord<T>(T metricRecord) | |
| { | |
| using var mstm = new MemoryStream(); | |
| JsonSerializer.Serialize(mstm, metricRecord); | |
| return _LogStorage.Enqueue(mstm.ToArray()); | |
| } | |
| public override ExportResult Export(in Batch<Metric> batch) | |
| { | |
| var resource = this.ParentProvider.GetResource(); | |
| var attributes = resource.Attributes.ToArray(); | |
| long lastEnqueueId = -1; | |
| foreach (var metric in batch) | |
| { | |
| var metricType = metric.MetricType; | |
| if (metricType.IsHistogram()) | |
| { | |
| var metricRecord = new HistogramMetricRecord(metric, attributes); | |
| foreach (ref readonly var pt in metric.GetMetricPoints()) | |
| { | |
| var mtpoint = new HistogramMetricPoint(pt, pt.GetHistogramCount(), pt.GetHistogramSum()); | |
| foreach (var bucket in pt.GetHistogramBuckets()) | |
| { | |
| mtpoint.Buckets.Add(bucket); | |
| } | |
| metricRecord.Points.Add(mtpoint); | |
| } | |
| lastEnqueueId = EnqueueRecord(metricRecord); | |
| } | |
| else if (metricType.IsSum()) | |
| { | |
| if (metricType.IsLong()) | |
| { | |
| var metricRecord = new SumMetricRecord<long>(metric, attributes); | |
| foreach (ref readonly var pt in metric.GetMetricPoints()) | |
| { | |
| metricRecord.Sums.Add(new SumMetricPoint<long>(pt, pt.GetSumLong())); | |
| } | |
| lastEnqueueId = EnqueueRecord(metricRecord); | |
| } | |
| else if (metricType.IsDouble()) | |
| { | |
| var metricRecord = new SumMetricRecord<double>(metric, attributes); | |
| foreach (ref readonly var pt in metric.GetMetricPoints()) | |
| { | |
| metricRecord.Sums.Add(new SumMetricPoint<double>(pt, pt.GetSumDouble())); | |
| } | |
| lastEnqueueId = EnqueueRecord(metricRecord); | |
| } | |
| } | |
| else if (metric.MetricType.IsGauge()) | |
| { | |
| if (metricType.IsLong()) | |
| { | |
| var metricRecord = new GaugeMetricRecord<long>(metric, attributes); | |
| foreach (ref readonly var pt in metric.GetMetricPoints()) | |
| { | |
| metricRecord.LastValues.Add(new GaugeMetricPoint<long>(pt, pt.GetGaugeLastValueLong())); | |
| } | |
| lastEnqueueId = EnqueueRecord(metricRecord); | |
| } | |
| else if (metricType.IsDouble()) | |
| { | |
| var metricRecord = new GaugeMetricRecord<double>(metric, attributes); | |
| foreach (ref readonly var pt in metric.GetMetricPoints()) | |
| { | |
| metricRecord.LastValues.Add(new GaugeMetricPoint<double>(pt, pt.GetGaugeLastValueDouble())); | |
| } | |
| lastEnqueueId = EnqueueRecord(metricRecord); | |
| } | |
| } | |
| } | |
| if (lastEnqueueId >= 0) | |
| { | |
| _LogStorage.Commit(); | |
| } | |
| return ExportResult.Success; | |
| } | |
| } | |
| public static class MeterProviderExtensions | |
| { | |
| // need opentelemetry 1.2.0 or later | |
| // use BaseExportingMetricReader if export metric manually | |
| public static MeterProviderBuilder AddFasterLogExporter(this MeterProviderBuilder builder, FasterLog logStorage, int exportInterval = 0, int exportTimeout = 30000) | |
| { | |
| MetricReader reader = exportInterval switch | |
| { | |
| 0 => new BaseExportingMetricReader(new FasterLogMetricExporter(logStorage)), | |
| > 0 => new PeriodicExportingMetricReader(new FasterLogMetricExporter(logStorage), exportInterval, exportTimeout), | |
| _ => throw new ArgumentException("export interval must be >= 0", "exportInterval"), | |
| }; | |
| return builder.AddReader(reader); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using FASTER.core; | |
| using System.Diagnostics; | |
| using OpenTelemetry.Trace; | |
| using OpenTelemetry; | |
| using OpenTelemetry.Resources; | |
| using OpenTelemetry.Metrics; | |
| using opentelemetryfltest; | |
| using System.Text; | |
| using System.Diagnostics.Metrics; | |
| await TestActivity(); | |
| await TestMetric(); | |
| async Task TestMetric() | |
| { | |
| const string MeterSourceName = "mymetersource"; | |
| const string MyCounterName = "mycounter"; | |
| using var MyMetric = new Meter(MeterSourceName); | |
| var MyCounter = MyMetric.CreateCounter<int>(MyCounterName, "unit", "desc"); | |
| using var device = Devices.CreateLogDevice("metriclog"); | |
| using var logStorage = new FasterLog(new FasterLogSettings() | |
| { | |
| LogDevice = device, | |
| SegmentSizeBits = 20, | |
| PageSizeBits = 10, | |
| MemorySizeBits = 15, | |
| }); | |
| using var provider = Sdk.CreateMeterProviderBuilder() | |
| .AddFasterLogExporter(logStorage, 1000) | |
| .AddMeter(MeterSourceName) | |
| .Build() | |
| ; | |
| for (int i = 0; i < 10000; i++) | |
| { | |
| MyCounter.Add(i, new KeyValuePair<string, object?> ("tag1", 1)); | |
| if((i & 0xff) == 0) | |
| { | |
| await Task.Delay(100).ConfigureAwait(false); | |
| } | |
| } | |
| provider.Shutdown(); | |
| logStorage.Commit(); | |
| using var iter = logStorage.Scan(logStorage.BeginAddress, logStorage.TailAddress); | |
| long cnt = 0; | |
| await foreach(var item in iter.GetAsyncEnumerable()) | |
| { | |
| var act = Encoding.UTF8.GetString(item.entry); | |
| Console.WriteLine(act); | |
| cnt++; | |
| } | |
| logStorage.TruncateUntil(logStorage.TailAddress); | |
| logStorage.Commit(); | |
| Console.WriteLine($"{nameof(TestMetric)}, lognum is {cnt}"); | |
| } | |
| async Task TestActivity() | |
| { | |
| const string ActivitySourceName = "myactivitysource"; | |
| using var MySource = new ActivitySource(ActivitySourceName); | |
| using var device = Devices.CreateLogDevice("testlog"); | |
| using var logStorage = new FasterLog(new FasterLogSettings() | |
| { | |
| LogDevice = device, | |
| SegmentSizeBits = 20, | |
| PageSizeBits = 10, | |
| MemorySizeBits = 15, | |
| }); | |
| Console.WriteLine($"{logStorage.TailAddress}, {logStorage.CommittedUntilAddress}"); | |
| using var provider = Sdk.CreateTracerProviderBuilder() | |
| .SetResourceBuilder(CreateResourceBuilder()) | |
| .AddSource(ActivitySourceName) | |
| .AddFasterLogExporter(logStorage) | |
| // .AddConsoleExporter() | |
| .Build(); | |
| foreach (var i in Enumerable.Range(0, 100)) | |
| { | |
| using (var act = MySource.StartActivity("act1", ActivityKind.Internal)) | |
| { | |
| act?.AddTag("tag1", i.ToString()); | |
| } | |
| } | |
| Console.WriteLine($"{logStorage.BeginAddress}, {logStorage.TailAddress}"); | |
| logStorage.WaitForCommit(logStorage.TailAddress); | |
| using var logiter = logStorage.Scan(logStorage.CommittedBeginAddress, logStorage.CommittedUntilAddress); | |
| long cnt = 0; | |
| await foreach (var item in logiter.GetAsyncEnumerable()) | |
| { | |
| var act = Encoding.UTF8.GetString(item.entry); | |
| Console.WriteLine(act); | |
| cnt++; | |
| } | |
| logiter.CompleteUntil(logiter.CurrentAddress); | |
| Console.WriteLine($"lognum is {cnt}, {logStorage.BeginAddress}, {logStorage.TailAddress}"); | |
| // truncate all logs | |
| logStorage.TruncateUntil(logStorage.TailAddress); | |
| logStorage.Commit(true); | |
| } | |
| ResourceBuilder CreateResourceBuilder() | |
| { | |
| return ResourceBuilder.CreateEmpty() | |
| .AddAttributes(new[] { new KeyValuePair<string, object>("abc", "def") }) | |
| .AddTelemetrySdk(); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment