Skip to content

Instantly share code, notes, and snippets.

@itn3000
Last active February 16, 2022 06:31
Show Gist options
  • Save itn3000/ba4ca3474eb4527d502e8e1606f143ec to your computer and use it in GitHub Desktop.
Save itn3000/ba4ca3474eb4527d502e8e1606f143ec to your computer and use it in GitHub Desktop.
opentelemetry-dotnet FasterLog exporter(test implementation)
using OpenTelemetry.Trace;
using OpenTelemetry.Resources;
using OpenTelemetry;
using FASTER.core;
using System.Text.Json;
using System.Buffers;
using System.Diagnostics;
using OpenTelemetry.Metrics;
namespace opentelemetryfltest
{
class FasterExporter<T> : BaseExporter<T> where T : class
{
record struct FasterLogRecord(T Data, IEnumerable<KeyValuePair<string, object>> Attributes);
FasterLog _LogStorage;
public FasterExporter(FasterLog storage)
{
_LogStorage = storage;
}
public override ExportResult Export(in Batch<T> batch)
{
var resource = this.ParentProvider.GetResource();
long lastEnqueueId = -1;
foreach (var item in batch)
{
// stored as utf-8 json string
using var mstm = new MemoryStream();
JsonSerializer.Serialize(mstm, new FasterLogRecord(item, resource != Resource.Empty ? resource.Attributes : Array.Empty<KeyValuePair<string, object>>()));
lastEnqueueId = _LogStorage.Enqueue(mstm.ToArray());
}
if (lastEnqueueId >= 0)
{
_LogStorage.Commit();
}
return ExportResult.Success;
}
}
static class TraceProviderExtensions
{
public static TracerProviderBuilder AddFasterLogExporter(this TracerProviderBuilder builder, FasterLog logStorage)
{
return builder.AddProcessor(new SimpleActivityExportProcessor(new FasterExporter<Activity>(logStorage)));
}
}
}
using OpenTelemetry.Trace;
using OpenTelemetry.Resources;
using OpenTelemetry;
using FASTER.core;
using System.Text.Json;
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using OpenTelemetry.Metrics;
namespace opentelemetryfltest;
public class MetricRecordBase
{
public string Name { get; }
public string MeterName { get; }
public string Description { get; }
public string Unit { get; }
public KeyValuePair<string, object?>[] Attributes { get; }
public MetricRecordBase(Metric metric, KeyValuePair<string, object?>[] attributes)
{
Name = metric.Name;
MeterName = metric.Meter.Name;
Description = metric.Description;
Unit = metric.Unit;
Attributes = attributes;
}
}
public class HistogramMetricRecord : MetricRecordBase
{
public HistogramMetricRecord(Metric metric, KeyValuePair<string, object?>[] attributes) : base(metric, attributes)
{ }
public List<HistogramMetricPoint> Points { get; } = new List<HistogramMetricPoint>();
}
public class TagElement
{
public string Key { get; }
public string? Value { get; }
public TagElement(string key, string? value)
{
Key = key;
Value = value;
}
}
public class MetricPointRecordBase
{
public DateTimeOffset StartTime { get; }
public DateTimeOffset EndTime { get; }
public TagElement[] Tags { get; }
public MetricPointRecordBase()
{
StartTime = DateTimeOffset.MinValue;
EndTime = DateTimeOffset.MinValue;
Tags = Array.Empty<TagElement>();
}
public MetricPointRecordBase(in MetricPoint point)
{
StartTime = point.StartTime;
EndTime = point.EndTime;
Tags = new TagElement[point.Tags.Count];
int count = 0;
foreach(var item in point.Tags)
{
Tags[count] = new TagElement(item.Key, item.Value != null ? item.Value.ToString() : null);
// Tags[item.Key] = item.Value != null ? item.Value.ToString() : null;
}
}
}
public class HistogramMetricPoint : MetricPointRecordBase
{
public HistogramMetricPoint(): base()
{
Count = 0;
Sum = 0;
}
public HistogramMetricPoint(in MetricPoint point, long count, double sum) : base(in point)
{
Count = count;
Sum = sum;
}
public long Count { get; }
public double Sum { get; }
public List<HistogramBucket> Buckets = new List<HistogramBucket>();
}
public class SumMetricRecord<T> : MetricRecordBase where T : struct
{
public SumMetricRecord(Metric metric, KeyValuePair<string, object?>[] attributes) : base(metric, attributes)
{ }
public List<SumMetricPoint<T>> Sums { get; } = new();
}
public class SumMetricPoint<T> : MetricPointRecordBase where T : struct
{
public SumMetricPoint(): base()
{
Sum = default;
}
public SumMetricPoint(in MetricPoint point, T sum) : base(point)
{
Sum = sum;
}
public T Sum { get; }
}
public class GaugeMetricRecord<T> : MetricRecordBase where T : struct
{
public GaugeMetricRecord(Metric metric, KeyValuePair<string, object?>[] attributes) : base(metric, attributes)
{
}
public List<GaugeMetricPoint<T>> LastValues = new();
}
public class GaugeMetricPoint<T> : MetricPointRecordBase where T : struct
{
public GaugeMetricPoint(): base()
{
LastValue = default;
}
public GaugeMetricPoint(in MetricPoint point, T lastValue) : base(point)
{
LastValue = lastValue;
}
public T LastValue;
}
class FasterLogMetricExporter : BaseExporter<Metric>
{
FasterLog _LogStorage;
public FasterLogMetricExporter(FasterLog storage)
{
_LogStorage = storage;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
long EnqueueRecord<T>(T metricRecord)
{
using var mstm = new MemoryStream();
JsonSerializer.Serialize(mstm, metricRecord);
return _LogStorage.Enqueue(mstm.ToArray());
}
public override ExportResult Export(in Batch<Metric> batch)
{
var resource = this.ParentProvider.GetResource();
var attributes = resource.Attributes.ToArray();
long lastEnqueueId = -1;
foreach (var metric in batch)
{
var metricType = metric.MetricType;
if (metricType.IsHistogram())
{
var metricRecord = new HistogramMetricRecord(metric, attributes);
foreach (ref readonly var pt in metric.GetMetricPoints())
{
var mtpoint = new HistogramMetricPoint(pt, pt.GetHistogramCount(), pt.GetHistogramSum());
foreach (var bucket in pt.GetHistogramBuckets())
{
mtpoint.Buckets.Add(bucket);
}
metricRecord.Points.Add(mtpoint);
}
lastEnqueueId = EnqueueRecord(metricRecord);
}
else if (metricType.IsSum())
{
if (metricType.IsLong())
{
var metricRecord = new SumMetricRecord<long>(metric, attributes);
foreach (ref readonly var pt in metric.GetMetricPoints())
{
metricRecord.Sums.Add(new SumMetricPoint<long>(pt, pt.GetSumLong()));
}
lastEnqueueId = EnqueueRecord(metricRecord);
}
else if (metricType.IsDouble())
{
var metricRecord = new SumMetricRecord<double>(metric, attributes);
foreach (ref readonly var pt in metric.GetMetricPoints())
{
metricRecord.Sums.Add(new SumMetricPoint<double>(pt, pt.GetSumDouble()));
}
lastEnqueueId = EnqueueRecord(metricRecord);
}
}
else if (metric.MetricType.IsGauge())
{
if (metricType.IsLong())
{
var metricRecord = new GaugeMetricRecord<long>(metric, attributes);
foreach (ref readonly var pt in metric.GetMetricPoints())
{
metricRecord.LastValues.Add(new GaugeMetricPoint<long>(pt, pt.GetGaugeLastValueLong()));
}
lastEnqueueId = EnqueueRecord(metricRecord);
}
else if (metricType.IsDouble())
{
var metricRecord = new GaugeMetricRecord<double>(metric, attributes);
foreach (ref readonly var pt in metric.GetMetricPoints())
{
metricRecord.LastValues.Add(new GaugeMetricPoint<double>(pt, pt.GetGaugeLastValueDouble()));
}
lastEnqueueId = EnqueueRecord(metricRecord);
}
}
}
if (lastEnqueueId >= 0)
{
_LogStorage.Commit();
}
return ExportResult.Success;
}
}
public static class MeterProviderExtensions
{
// need opentelemetry 1.2.0 or later
// use BaseExportingMetricReader if export metric manually
public static MeterProviderBuilder AddFasterLogExporter(this MeterProviderBuilder builder, FasterLog logStorage, int exportInterval = 0, int exportTimeout = 30000)
{
MetricReader reader = exportInterval switch
{
0 => new BaseExportingMetricReader(new FasterLogMetricExporter(logStorage)),
> 0 => new PeriodicExportingMetricReader(new FasterLogMetricExporter(logStorage), exportInterval, exportTimeout),
_ => throw new ArgumentException("export interval must be >= 0", "exportInterval"),
};
return builder.AddReader(reader);
}
}
using FASTER.core;
using System.Diagnostics;
using OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Metrics;
using opentelemetryfltest;
using System.Text;
using System.Diagnostics.Metrics;
await TestActivity();
await TestMetric();
async Task TestMetric()
{
const string MeterSourceName = "mymetersource";
const string MyCounterName = "mycounter";
using var MyMetric = new Meter(MeterSourceName);
var MyCounter = MyMetric.CreateCounter<int>(MyCounterName, "unit", "desc");
using var device = Devices.CreateLogDevice("metriclog");
using var logStorage = new FasterLog(new FasterLogSettings()
{
LogDevice = device,
SegmentSizeBits = 20,
PageSizeBits = 10,
MemorySizeBits = 15,
});
using var provider = Sdk.CreateMeterProviderBuilder()
.AddFasterLogExporter(logStorage, 1000)
.AddMeter(MeterSourceName)
.Build()
;
for (int i = 0; i < 10000; i++)
{
MyCounter.Add(i, new KeyValuePair<string, object?> ("tag1", 1));
if((i & 0xff) == 0)
{
await Task.Delay(100).ConfigureAwait(false);
}
}
provider.Shutdown();
logStorage.Commit();
using var iter = logStorage.Scan(logStorage.BeginAddress, logStorage.TailAddress);
long cnt = 0;
await foreach(var item in iter.GetAsyncEnumerable())
{
var act = Encoding.UTF8.GetString(item.entry);
Console.WriteLine(act);
cnt++;
}
logStorage.TruncateUntil(logStorage.TailAddress);
logStorage.Commit();
Console.WriteLine($"{nameof(TestMetric)}, lognum is {cnt}");
}
async Task TestActivity()
{
const string ActivitySourceName = "myactivitysource";
using var MySource = new ActivitySource(ActivitySourceName);
using var device = Devices.CreateLogDevice("testlog");
using var logStorage = new FasterLog(new FasterLogSettings()
{
LogDevice = device,
SegmentSizeBits = 20,
PageSizeBits = 10,
MemorySizeBits = 15,
});
Console.WriteLine($"{logStorage.TailAddress}, {logStorage.CommittedUntilAddress}");
using var provider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(CreateResourceBuilder())
.AddSource(ActivitySourceName)
.AddFasterLogExporter(logStorage)
// .AddConsoleExporter()
.Build();
foreach (var i in Enumerable.Range(0, 100))
{
using (var act = MySource.StartActivity("act1", ActivityKind.Internal))
{
act?.AddTag("tag1", i.ToString());
}
}
Console.WriteLine($"{logStorage.BeginAddress}, {logStorage.TailAddress}");
logStorage.WaitForCommit(logStorage.TailAddress);
using var logiter = logStorage.Scan(logStorage.CommittedBeginAddress, logStorage.CommittedUntilAddress);
long cnt = 0;
await foreach (var item in logiter.GetAsyncEnumerable())
{
var act = Encoding.UTF8.GetString(item.entry);
Console.WriteLine(act);
cnt++;
}
logiter.CompleteUntil(logiter.CurrentAddress);
Console.WriteLine($"lognum is {cnt}, {logStorage.BeginAddress}, {logStorage.TailAddress}");
// truncate all logs
logStorage.TruncateUntil(logStorage.TailAddress);
logStorage.Commit(true);
}
ResourceBuilder CreateResourceBuilder()
{
return ResourceBuilder.CreateEmpty()
.AddAttributes(new[] { new KeyValuePair<string, object>("abc", "def") })
.AddTelemetrySdk();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment