Last active
July 12, 2023 18:04
-
-
Save antonfirsov/6bcd1e2d099eef52a077dd0fab862bfb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define METRICS | |
using System.Diagnostics.Metrics; | |
using BenchmarkDotNet.Attributes; | |
using System.Text; | |
using System.Diagnostics; | |
using System.Threading.Tasks.Sources; | |
using System.Net; | |
#if METRICS | |
using System.Net.Http.Metrics; | |
#endif | |
namespace EnrichmentBenchmarks | |
{ | |
[MemoryDiagnoser] | |
public abstract class EnrichmentBenchmarkBase | |
{ | |
protected HttpClient? _httpClient; | |
private readonly MeterListener _meterListener = new MeterListener(); | |
protected static readonly Uri RequestUri = new Uri("http://test/"); | |
protected virtual int GetConcurrency() => 1; | |
static EnrichmentBenchmarkBase() | |
{ | |
Console.WriteLine($"RUNTIME LOC: {typeof(object).Assembly.Location}"); | |
} | |
protected void EnableMetrics() | |
{ | |
_meterListener.InstrumentPublished = (instrument, listener) => | |
{ | |
if (instrument.Meter.Name == "System.Net.Http") | |
{ | |
Console.WriteLine($"ENABLED {instrument.Name}"); | |
listener.EnableMeasurementEvents(instrument); | |
} | |
}; | |
_meterListener.Start(); | |
} | |
protected SocketsHttpHandler CreateHandler() | |
{ | |
CountdownEvent connectCallbackLock = new CountdownEvent(GetConcurrency()); | |
return new SocketsHttpHandler | |
{ | |
UseProxy = false, | |
AllowAutoRedirect = false, | |
AutomaticDecompression = DecompressionMethods.None, | |
UseCookies = false, | |
ActivityHeadersPropagator = null, | |
PooledConnectionIdleTimeout = TimeSpan.FromDays(10), // Avoid the cleaning timer executing during the benchmark | |
ConnectCallback = (context, cancellation) => | |
{ | |
connectCallbackLock.Signal(); | |
connectCallbackLock.Wait(cancellation); | |
return new ValueTask<Stream>(new ResponseStream()); | |
} | |
}; | |
} | |
protected Task SendRequestAsync() => _httpClient!.SendAsync(new HttpRequestMessage(HttpMethod.Get, RequestUri) | |
{ | |
Version = HttpVersion.Version11, | |
VersionPolicy = HttpVersionPolicy.RequestVersionExact, | |
}); | |
[GlobalSetup(Target = "NoMetrics")] | |
public void SetupNoMetrics() | |
{ | |
Console.WriteLine("-- SetupNoMetrics --"); | |
_httpClient = new HttpClient(CreateHandler()); | |
} | |
#if METRICS | |
[GlobalSetup(Target = "WithMetrics")] | |
public void SetupMetrics() | |
{ | |
Console.WriteLine("-- SetupMetrics --"); | |
_httpClient = new HttpClient(CreateHandler()); | |
EnableMetrics(); | |
} | |
[GlobalSetup(Target = "WithMetricsAndEnrichment")] | |
public void SetupMetricsWithEnrichment() | |
{ | |
Console.WriteLine("-- SetupMetricsWithEnrichment --"); | |
_httpClient = new HttpClient(new EnrichmentHandler(CreateHandler())); | |
EnableMetrics(); | |
} | |
private sealed class EnrichmentHandler : DelegatingHandler | |
{ | |
public EnrichmentHandler(HttpMessageHandler innerHandler) : base(innerHandler) | |
{ | |
} | |
protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) | |
{ | |
HttpMetricsEnrichmentContext.AddCallback(request, Enrich); | |
return base.Send(request, cancellationToken); | |
} | |
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) | |
{ | |
HttpMetricsEnrichmentContext.AddCallback(request, Enrich); | |
return base.SendAsync(request, cancellationToken); | |
} | |
private static void Enrich(HttpMetricsEnrichmentContext context) | |
{ | |
context.AddCustomTag("x", "y"); | |
} | |
} | |
#endif | |
} | |
public class EnrichmentBenchmark_SingleRequest : EnrichmentBenchmarkBase | |
{ | |
[Benchmark] | |
public Task NoMetrics() => SendRequestAsync(); | |
#if METRICS | |
[Benchmark] | |
public Task WithMetrics() => SendRequestAsync(); | |
[Benchmark] | |
public Task WithMetricsAndEnrichment() => SendRequestAsync(); | |
#endif | |
} | |
public class EnrichmentBenchmark_ParallelRequests : EnrichmentBenchmarkBase | |
{ | |
[Params(20)] | |
public int Concurrency { get; set; } | |
protected override int GetConcurrency() => Concurrency; | |
private Task SendParallelRequestsAsync() | |
{ | |
Task[] tasks = new Task[Concurrency]; | |
for (int i = 0; i < Concurrency; i++) | |
{ | |
tasks[i] = Task.Run(SendRequestAsync); | |
} | |
return Task.WhenAll(tasks); | |
} | |
[Benchmark] | |
public Task NoMetrics() => SendParallelRequestsAsync(); | |
#if METRICS | |
[Benchmark] | |
public Task WithMetrics() => SendParallelRequestsAsync(); | |
[Benchmark] | |
public Task WithMetricsAndEnrichment() => SendParallelRequestsAsync(); | |
#endif | |
} | |
} | |
public sealed class ResponseStream : Stream, IValueTaskSource<int> | |
{ | |
private ManualResetValueTaskSourceCore<int> _waitSource = new() { RunContinuationsAsynchronously = true }; | |
private bool _writeCompleted; | |
private bool _readStarted; | |
private readonly byte[] _responseData; | |
private readonly bool _forceAsyncYield; | |
public ResponseStream(bool forceAsyncYield = true) | |
: this(DefaultResponse, forceAsyncYield) | |
{ | |
} | |
public ResponseStream(byte[] responseData, bool forceAsyncYield = true) | |
{ | |
_responseData = responseData; | |
_forceAsyncYield = forceAsyncYield; | |
} | |
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) | |
{ | |
return _forceAsyncYield | |
? ReadAsyncSlow(buffer, cancellationToken) | |
: ReadAsyncCore(buffer, cancellationToken); | |
} | |
private async ValueTask<int> ReadAsyncSlow(Memory<byte> buffer, CancellationToken cancellationToken) | |
{ | |
await Task.Yield(); | |
return await ReadAsyncCore(buffer, cancellationToken); | |
} | |
private ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancellationToken) | |
{ | |
_responseData.CopyTo(buffer.Span); | |
lock (this) | |
{ | |
if (_writeCompleted) | |
{ | |
_writeCompleted = false; | |
return new ValueTask<int>(_responseData.Length); | |
} | |
else | |
{ | |
_readStarted = true; | |
_waitSource.Reset(); | |
return new ValueTask<int>(this, _waitSource.Version); | |
} | |
} | |
} | |
public override int Read(Span<byte> buffer) | |
{ | |
Debug.Assert(_writeCompleted); | |
_writeCompleted = false; | |
_responseData.CopyTo(buffer); | |
return _responseData.Length; | |
} | |
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) | |
{ | |
lock (this) | |
{ | |
if (_readStarted) | |
{ | |
_readStarted = false; | |
_waitSource.SetResult(_responseData.Length); | |
} | |
else | |
{ | |
_writeCompleted = true; | |
} | |
} | |
return default; | |
} | |
public override void Write(ReadOnlySpan<byte> buffer) | |
{ | |
Debug.Assert(!_readStarted); | |
_writeCompleted = true; | |
} | |
public int GetResult(short token) => | |
_waitSource.GetResult(token); | |
public ValueTaskSourceStatus GetStatus(short token) => | |
_waitSource.GetStatus(token); | |
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => | |
_waitSource.OnCompleted(continuation, state, token, flags); | |
public override bool CanRead => true; | |
public override bool CanSeek => false; | |
public override bool CanWrite => true; | |
public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; | |
public override void Flush() => throw new InvalidOperationException(); | |
public override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); | |
public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException(); | |
public override void SetLength(long value) => throw new InvalidOperationException(); | |
public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); | |
public override long Length => throw new InvalidOperationException(); | |
public override long Position { get => throw new InvalidOperationException(); set => throw new InvalidOperationException(); } | |
private static readonly byte[] DefaultResponse = Encoding.UTF8.GetBytes(@"HTTP/1.1 200 OK | |
Content-Type: text/plain; charset=UTF-8 | |
Content-Length: 4 | |
Connection: Keep-Alive | |
test"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment