Skip to content

Instantly share code, notes, and snippets.

@object
Last active April 25, 2024 14:21
Show Gist options
  • Save object/a3fa8db5acc42632a13059df170cf43d to your computer and use it in GitHub Desktop.
Save object/a3fa8db5acc42632a13059df170cf43d to your computer and use it in GitHub Desktop.
Test for Akka.NET cluster pool
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.Routing;
using Xunit;
using Xunit.Abstractions;
namespace Akka.Cluster.Tests.Routing;
public static class PoolConfig
{
public static Config Get(string systemName, int seedPort, int port) => $$"""
akka
{
actor
{
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
deployment
{
/echo
{
router = broadcast-pool
cluster
{
enabled = on
max-nr-of-instances-per-node = 2
max-total-nr-of-instances = 6
allow-local-routees = on
use-role = Upload
}
}
}
}
remote
{
dot-netty.tcp
{
public-hostname = localhost
hostname = localhost
port = {{port}}
}
}
cluster
{
roles = ["Upload"]
seed-nodes = [ "akka.tcp://{{systemName}}@localhost:{{seedPort}}" ]
min-nr-of-members = 3
}
}
""";
}
sealed class EchoActor: UntypedActor
{
private readonly UniqueAddress _selfAddress;
private readonly ILoggingAdapter _log;
private readonly IActorRef _testActor;
public EchoActor(IActorRef testActor)
{
_selfAddress = Cluster.Get(Context.System).SelfUniqueAddress;
_log = Context.GetLogger();
_testActor = testActor;
}
protected override void OnReceive(object message)
{
switch (message)
{
case "ping":
_log.Info("Received ping on {0} from {1}", _selfAddress, Sender);
_testActor.Tell("pong");
break;
default:
Unhandled(message);
break;
}
}
}
public class ClusterBroadcastPool1: TestKit.Xunit2.TestKit
{
private const string SystemName = "cluster-system-1";
private const int SeedPort = 5000;
private readonly ITestOutputHelper _out;
private readonly Cluster _cluster;
private readonly ActorSystem _system2;
private readonly ActorSystem _system3;
public ClusterBroadcastPool1(ITestOutputHelper output) : base(PoolConfig.Get(SystemName, SeedPort, SeedPort), SystemName, output)
{
_out = output;
_system2 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 1));
_system3 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 2));
_cluster = Cluster.Get(Sys);
InitializeLogger(_system2, "[SYS-2]");
InitializeLogger(_system3, "[SYS-3]");
}
[Fact]
public async Task BroadcastPoolTest_ClusterFromTestKit()
{
try
{
var tcs = new TaskCompletionSource<Done>();
_cluster.RegisterOnMemberUp(() => { tcs.SetResult(Done.Instance); });
await tcs.Task;
var propsWithRouter = Props.Create(() => new EchoActor(TestActor)).WithRouter(FromConfig.Instance);
var pool = Sys.ActorOf(propsWithRouter, "echo");
// wait until cluster pool stabilizes
await Task.Delay(1000);
pool.Tell("ping");
await ExpectMsgAsync("pong");
await ExpectMsgAsync("pong");
await ExpectMsgAsync("pong");
await ExpectMsgAsync("pong");
await ExpectMsgAsync("pong");
await ExpectMsgAsync("pong");
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}
finally
{
await Task.WhenAll(
_system2.Terminate(),
_system3.Terminate());
}
}
}
public class ClusterBroadcastPool2: TestKit.Xunit2.TestKit
{
private const string SystemName = "cluster-system-2";
private const int SeedPort = 5010;
private readonly ITestOutputHelper _out;
private readonly Cluster _cluster;
private readonly ActorSystem _system1;
private readonly ActorSystem _system2;
private readonly ActorSystem _system3;
public ClusterBroadcastPool2(ITestOutputHelper output) : base(PoolConfig.Get(SystemName, SeedPort, SeedPort), SystemName, output)
{
_out = output;
_system1 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 1));
_system2 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 2));
_system3 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 3));
_cluster = Cluster.Get(_system1);
InitializeLogger(_system1, "[SYS-1]");
InitializeLogger(_system2, "[SYS-2]");
InitializeLogger(_system3, "[SYS-3]");
}
[Fact]
public async Task BroadcastPoolTest_ClusterFromTestKit()
{
try
{
var tcs = new TaskCompletionSource<Done>();
_cluster.RegisterOnMemberUp(() => { tcs.SetResult(Done.Instance); });
await tcs.Task;
var probe = CreateTestProbe();
var propsWithRouter = Props.Create(() => new EchoActor(probe)).WithRouter(FromConfig.Instance);
var pool = _system1.ActorOf(propsWithRouter, "echo");
// wait until cluster pool stabilizes
await Task.Delay(1000);
pool.Tell("ping");
await probe.ExpectMsgAsync("pong");
await probe.ExpectMsgAsync("pong");
// await probe.ExpectMsgAsync("pong");
// await probe.ExpectMsgAsync("pong");
// await probe.ExpectMsgAsync("pong");
// await probe.ExpectMsgAsync("pong");
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}
finally
{
await Task.WhenAll(
_system1.Terminate(),
_system2.Terminate(),
_system3.Terminate());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment