Last active
January 12, 2024 15:10
-
-
Save dili91/343948a5e8289b617ff8c855b3a3f8d1 to your computer and use it in GitHub Desktop.
A test for concurrency behaviour on EasyNetQ while consuming messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using EasyNetQ; | |
using EasyNetQ.Consumer; | |
using EasyNetQ.Topology; | |
using Xunit.Abstractions; | |
namespace easynetq_concurrency_tests; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using FluentAssertions; | |
using Xunit; | |
using System.Text.Json; | |
public class ConcurrencyTests | |
{ | |
private static readonly TimeSpan BlockingTaskDuration = TimeSpan.FromMilliseconds(100); | |
private readonly ITestOutputHelper _testOutputHelper; | |
private readonly ConcurrentQueue<EventTrack> _eventHistory = new(); | |
public ConcurrencyTests(ITestOutputHelper testOutputHelper) | |
{ | |
_testOutputHelper = testOutputHelper; | |
} | |
[Theory] | |
[MemberData(nameof(ConcurrentConsumptionTestData))] | |
public async Task EasyNetQ_ConsumeAsync_Should_Consume_Concurrently( | |
ushort prefetchCount, | |
ushort consumerDispatchConcurrency, | |
ushort noOfMessages) | |
{ | |
var connectionConfig = new ConnectionConfiguration | |
{ | |
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } }, | |
VirtualHost = "/", | |
ConsumerDispatcherConcurrency = consumerDispatchConcurrency, | |
PrefetchCount = prefetchCount, | |
UserName = "guest", | |
Password = "guest", | |
}; | |
// Arrange | |
var bus = RabbitHutch.CreateBus(_ => connectionConfig, | |
register => register.RegisterDefaultServices(_ => connectionConfig)); | |
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic); | |
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false); | |
await bus.Advanced.BindAsync(exchange, queue, "*"); | |
var consumer = bus.Advanced.Consume<TestEvent>(queue, (m, _) => | |
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m.Body)); | |
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent | |
{ | |
Id = (ushort)i, | |
}).ToList(); | |
foreach (var m in messages) | |
{ | |
await bus.Advanced.PublishAsync(exchange, "*", true, new Message<TestEvent>(m)); | |
} | |
// Assert | |
// wait until all messages are consumed | |
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count), | |
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2)); | |
// Assert that all messages were consumed concurrently | |
_eventHistory.Count.Should().Be(2 * messages.Count); | |
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count); | |
var eventHistorySequential = _eventHistory | |
.OrderBy(e => e.Event.Id) | |
.ThenBy(e => e.Step); | |
_eventHistory.Should().NotBeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering()); | |
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs | |
} | |
[Theory] | |
[MemberData(nameof(ConcurrentConsumptionTestData))] | |
public async Task EasyNetQ_ConsumeAsync_Raw_Should_Consume_Concurrently( | |
ushort prefetchCount, | |
ushort consumerDispatchConcurrency, | |
ushort noOfMessages) | |
{ | |
var connectionConfig = new ConnectionConfiguration | |
{ | |
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } }, | |
VirtualHost = "/", | |
ConsumerDispatcherConcurrency = consumerDispatchConcurrency, | |
PrefetchCount = prefetchCount, | |
UserName = "guest", | |
Password = "guest", | |
}; | |
// Arrange | |
var bus = RabbitHutch.CreateBus(_ => connectionConfig, | |
register => register.RegisterDefaultServices(_ => connectionConfig)); | |
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic); | |
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false); | |
await bus.Advanced.BindAsync(exchange, queue, "*"); | |
var consumer = bus.Advanced.Consume(queue, (body, property, info) => | |
{ | |
var m = JsonSerializer.Deserialize<TestEvent>(body.Span)!; | |
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m); | |
}); | |
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent | |
{ | |
Id = (ushort)i, | |
}).ToList(); | |
foreach (var m in messages) | |
{ | |
var body = new ReadOnlyMemory<byte>(JsonSerializer.SerializeToUtf8Bytes(m)); | |
await bus.Advanced.PublishAsync(exchange, "*", true, | |
properties: new MessageProperties(), | |
body); | |
} | |
// Assert | |
// wait until all messages are consumed | |
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count), | |
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2)); | |
// Assert that all messages were consumed concurrently | |
_eventHistory.Count.Should().Be(2 * messages.Count); | |
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count); | |
var eventHistorySequential = _eventHistory | |
.OrderBy(e => e.Event.Id) | |
.ThenBy(e => e.Step); | |
_eventHistory.Should().NotBeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering()); | |
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs | |
} | |
[Theory] | |
[MemberData(nameof(ConcurrentConsumptionTestData))] | |
public async Task EasyNetQ_ConsumeAsync_Raw_With_Delegate_Should_Consume_Sequentially_Even_If_PrefetchCountAndConsumerDispatchAllowConcurrency( | |
ushort prefetchCount, | |
ushort consumerDispatchConcurrency, | |
ushort noOfMessages) | |
{ | |
var connectionConfig = new ConnectionConfiguration | |
{ | |
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } }, | |
VirtualHost = "/", | |
ConsumerDispatcherConcurrency = consumerDispatchConcurrency, | |
PrefetchCount = prefetchCount, | |
UserName = "guest", | |
Password = "guest", | |
}; | |
// Arrange | |
var bus = RabbitHutch.CreateBus(_ => connectionConfig, | |
register => register.RegisterDefaultServices(_ => connectionConfig)); | |
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic); | |
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false); | |
await bus.Advanced.BindAsync(exchange, queue, "*"); | |
var consumer = bus.Advanced.Consume(queue, new TestMessageHandlerDelegateWrapper(_testOutputHelper, _eventHistory).HandleMessageAsync, _ => { }); | |
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent | |
{ | |
Id = (ushort)i, | |
}).ToList(); | |
foreach (var m in messages) | |
{ | |
var body = new ReadOnlyMemory<byte>(JsonSerializer.SerializeToUtf8Bytes(m)); | |
await bus.Advanced.PublishAsync(exchange, "*", true, | |
properties: new MessageProperties(), | |
body); | |
} | |
// Assert | |
// wait until all messages are consumed | |
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count), | |
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2)); | |
// Assert that all messages were consumed concurrently | |
_eventHistory.Count.Should().Be(2 * messages.Count); | |
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count); | |
var eventHistorySequential = _eventHistory | |
.OrderBy(e => e.Event.Id) | |
.ThenBy(e => e.Step); | |
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering()); | |
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs | |
} | |
[Theory] | |
[MemberData(nameof(SequentialConsumptionTestData))] | |
public async Task EasyNetQ_ConsumeAsync_Should_Consume_Sequentially(ushort prefetchCount, | |
ushort consumerDispatchConcurrency, | |
ushort noOfMessages) | |
{ | |
var connectionConfig = new ConnectionConfiguration | |
{ | |
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } }, | |
VirtualHost = "/", | |
ConsumerDispatcherConcurrency = consumerDispatchConcurrency, | |
PrefetchCount = prefetchCount, | |
UserName = "guest", | |
Password = "guest", | |
}; | |
// Arrange | |
var bus = RabbitHutch.CreateBus(_ => connectionConfig, | |
register => register.RegisterDefaultServices(_ => connectionConfig)); | |
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic); | |
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false); | |
await bus.Advanced.BindAsync(exchange, queue, "*"); | |
var consumer = bus.Advanced.Consume<TestEvent>(queue, (m, _) => | |
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m.Body)); | |
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent | |
{ | |
Id = (ushort)i, | |
}).ToList(); | |
foreach (var m in messages) | |
{ | |
await bus.Advanced.PublishAsync(exchange, "*", true, new Message<TestEvent>(m)); | |
} | |
// Assert | |
// wait until all messages are consumed | |
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count), | |
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2)); | |
// Assert that all messages were consumed concurrently | |
_eventHistory.Count.Should().Be(2 * messages.Count); | |
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count); | |
var eventHistorySequential = _eventHistory | |
.OrderBy(e => e.Event.Id) | |
.ThenBy(e => e.Step); | |
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering()); | |
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs | |
} | |
[Theory] | |
[MemberData(nameof(SequentialConsumptionTestData))] | |
public async Task EasyNetQ_ConsumeAsync_Raw_Should_Consume_Sequentially(ushort prefetchCount, | |
ushort consumerDispatchConcurrency, | |
ushort noOfMessages) | |
{ | |
var connectionConfig = new ConnectionConfiguration | |
{ | |
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } }, | |
VirtualHost = "/", | |
ConsumerDispatcherConcurrency = consumerDispatchConcurrency, | |
PrefetchCount = prefetchCount, | |
UserName = "guest", | |
Password = "guest", | |
}; | |
// Arrange | |
var bus = RabbitHutch.CreateBus(_ => connectionConfig, | |
register => register.RegisterDefaultServices(_ => connectionConfig)); | |
var exchange = await bus.Advanced.ExchangeDeclareAsync("myExchange", ExchangeType.Topic); | |
var queue = await bus.Advanced.QueueDeclareAsync("myQueue", durable: true, exclusive: false, autoDelete: false); | |
await bus.Advanced.BindAsync(exchange, queue, "*"); | |
var consumer = bus.Advanced.Consume(queue, (body, property, info) => | |
{ | |
var m = JsonSerializer.Deserialize<TestEvent>(body.Span)!; | |
new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(m); | |
}); | |
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent | |
{ | |
Id = (ushort)i, | |
}).ToList(); | |
foreach (var m in messages) | |
{ | |
var body = new ReadOnlyMemory<byte>(JsonSerializer.SerializeToUtf8Bytes(m)); | |
await bus.Advanced.PublishAsync(exchange, "*", true, | |
properties: new MessageProperties(), | |
body); | |
} | |
// Assert | |
// wait until all messages are consumed | |
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count), | |
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2)); | |
// Assert that all messages were consumed concurrently | |
_eventHistory.Count.Should().Be(2 * messages.Count); | |
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count); | |
var eventHistorySequential = _eventHistory | |
.OrderBy(e => e.Event.Id) | |
.ThenBy(e => e.Step); | |
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering()); | |
consumer.Dispose(); //explicit disposable is required to stop stealing messages from subsequent test runs | |
} | |
[Theory] | |
[MemberData(nameof(ConcurrentConsumptionTestData))] | |
public async Task EasyNetQ_SubscribeAsync_Should_Consume_Concurrently(ushort prefetchCount, | |
ushort consumerDispatchConcurrency, | |
ushort noOfMessages) | |
{ | |
var connectionConfig = new ConnectionConfiguration | |
{ | |
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } }, | |
VirtualHost = "/", | |
ConsumerDispatcherConcurrency = consumerDispatchConcurrency, | |
PrefetchCount = prefetchCount, | |
UserName = "guest", | |
Password = "guest", | |
}; | |
// Arrange | |
var bus = RabbitHutch.CreateBus(_ => connectionConfig, | |
register => register.RegisterDefaultServices(_ => connectionConfig)); | |
await bus.PubSub.SubscribeAsync<TestEvent>(Guid.NewGuid().ToString(), | |
e => new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(e)); | |
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent | |
{ | |
Id = (ushort)i, | |
}).ToList(); | |
// Act | |
foreach (var m in messages) | |
{ | |
await bus.PubSub.PublishAsync(m); | |
} | |
// Assert | |
// wait until all messages are consumed | |
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count), | |
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2)); | |
// Assert that all messages were consumed sequentially | |
_eventHistory.Count.Should().Be(2 * messages.Count); | |
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count); | |
var eventHistorySequential = _eventHistory | |
.OrderBy(e => e.Event.Id) | |
.ThenBy(e => e.Step); | |
_eventHistory.Should().NotBeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering()); | |
} | |
[Theory] | |
[MemberData(nameof(SequentialConsumptionTestData))] | |
public async Task EasyNetQ_SubscribeAsync_Should_Consume_Sequentially(ushort prefetchCount, | |
ushort consumerDispatchConcurrency, | |
ushort noOfMessages) | |
{ | |
var connectionConfig = new ConnectionConfiguration | |
{ | |
Hosts = new List<HostConfiguration>() { new() { Host = "localhost", Port = 2765 } }, | |
VirtualHost = "/", | |
ConsumerDispatcherConcurrency = consumerDispatchConcurrency, | |
PrefetchCount = prefetchCount, | |
UserName = "guest", | |
Password = "guest", | |
}; | |
// Arrange | |
var bus = RabbitHutch.CreateBus(_ => connectionConfig, | |
register => register.RegisterDefaultServices(_ => connectionConfig)); | |
await bus.PubSub.SubscribeAsync<TestEvent>(Guid.NewGuid().ToString(), | |
e => new TestMessageHandler(_testOutputHelper, _eventHistory).Handle(e)); | |
var messages = Enumerable.Range(0, noOfMessages).Select(i => new TestEvent | |
{ | |
Id = (ushort)i, | |
}).ToList(); | |
// Act | |
foreach (var m in messages) | |
{ | |
await bus.PubSub.PublishAsync(m); | |
} | |
// Assert | |
// wait until all messages are consumed | |
SpinWait.SpinUntil(() => _eventHistory.Count.Equals(2 * messages.Count), | |
BlockingTaskDuration.Multiply(noOfMessages).Multiply(2)); | |
// Assert that all messages were consumed sequentially | |
_eventHistory.Count.Should().Be(2 * messages.Count); | |
_eventHistory.Count(x => x.Step.Equals(EventTrack.ConsumingStep.Start)).Should().Be(messages.Count); | |
var eventHistorySequential = _eventHistory | |
.OrderBy(e => e.Event.Id) | |
.ThenBy(e => e.Step); | |
_eventHistory.Should().BeEquivalentTo(eventHistorySequential, x => x.WithStrictOrdering()); | |
} | |
public static IEnumerable<object[]> ConcurrentConsumptionTestData() | |
{ | |
//prefetchCount, consumerDispatchConcurrency, noOfMessages | |
yield return new object[] { 50, 30, 10 }; | |
yield return new object[] { 50, 100, 50 }; | |
yield return new object[] { 50, 3, 2 }; | |
} | |
public static IEnumerable<object[]> SequentialConsumptionTestData() | |
{ | |
//prefetchCount, consumerDispatchConcurrency, noOfMessages | |
yield return new object[] { 1, 10, 10 }; | |
yield return new object[] { 10, 1, 10 }; | |
} | |
public class TestMessageHandler | |
{ | |
private readonly ITestOutputHelper _testOutputHelper; | |
private readonly ConcurrentQueue<EventTrack> _eventHistory; | |
public TestMessageHandler(ITestOutputHelper testOutputHelper, ConcurrentQueue<EventTrack> eventHistory) | |
{ | |
_testOutputHelper = testOutputHelper; | |
_eventHistory = eventHistory; | |
} | |
public Action<TestEvent> Handle => e => | |
{ | |
_testOutputHelper.WriteLine("Start consuming message {0}", e.Id); | |
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.Start)); | |
// This delay simulates CPU intensive operations, blocking the current thread | |
Thread.Sleep(BlockingTaskDuration); | |
_testOutputHelper.WriteLine("End consuming message {0}", e.Id); | |
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.End)); | |
}; | |
} | |
public class TestMessageHandlerDelegateWrapper | |
{ | |
private readonly ITestOutputHelper _testOutputHelper; | |
private readonly ConcurrentQueue<EventTrack> _eventHistory; | |
public TestMessageHandlerDelegateWrapper(ITestOutputHelper testOutputHelper, ConcurrentQueue<EventTrack> eventHistory) | |
{ | |
_testOutputHelper = testOutputHelper; | |
_eventHistory = eventHistory; | |
} | |
public async Task<AckStrategy> HandleMessageAsync( | |
ReadOnlyMemory<byte> body, | |
MessageProperties properties, | |
MessageReceivedInfo receivedInfo, | |
CancellationToken cancellationToken) | |
{ | |
var e = JsonSerializer.Deserialize<TestEvent>(body.Span)!; | |
_testOutputHelper.WriteLine("Start consuming message {0}", e.Id); | |
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.Start)); | |
// This delay simulates CPU intensive operations, blocking the current thread | |
Thread.Sleep(BlockingTaskDuration); | |
_testOutputHelper.WriteLine("End consuming message {0}", e.Id); | |
_eventHistory.Enqueue(new EventTrack(e, EventTrack.ConsumingStep.End)); | |
return AckStrategies.Ack; | |
} | |
} | |
public sealed class TestEvent | |
{ | |
public ushort Id { get; set; } | |
} | |
public record EventTrack(TestEvent Event, EventTrack.ConsumingStep Step) | |
{ | |
public enum ConsumingStep | |
{ | |
Start = 0, End = 1 | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project> | |
<PropertyGroup> | |
<Nullable>enable</Nullable> | |
</PropertyGroup> | |
<PropertyGroup> | |
<EasyNetQLibraryVersion>7.8.0</EasyNetQLibraryVersion> | |
<MicrosoftExtensionLibraryVersion>3.1.8</MicrosoftExtensionLibraryVersion> | |
<!-- Testing libraries --> | |
<FluentAssertionsLibraryVersion>6.12.0</FluentAssertionsLibraryVersion> | |
</PropertyGroup> | |
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>net7.0</TargetFramework> | |
<RootNamespace>easynetq_concurrency_tests</RootNamespace> | |
<ImplicitUsings>enable</ImplicitUsings> | |
<Nullable>enable</Nullable> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="EasyNetQ" Version="$(EasyNetQLibraryVersion)" /> | |
<PackageReference Include="EasyNetQ.Serialization.NewtonsoftJson" Version="$(EasyNetQLibraryVersion)" /> | |
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsLibraryVersion)" /> | |
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="$(MicrosoftExtensionLibraryVersion)" /> | |
<PackageReference Include="Microsoft.Extensions.Hosting" Version="$(MicrosoftExtensionLibraryVersion)" /> | |
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" /> | |
<PackageReference Include="xunit" Version="2.4.1" /> | |
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3"> | |
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | |
<PrivateAssets>all</PrivateAssets> | |
</PackageReference> | |
</ItemGroup> | |
</Project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment