Last active
April 25, 2024 14:21
-
-
Save object/a3fa8db5acc42632a13059df170cf43d to your computer and use it in GitHub Desktop.
Test for Akka.NET cluster pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Akka.Actor; | |
using Akka.Configuration; | |
using Akka.Event; | |
using Akka.Routing; | |
using Xunit; | |
using Xunit.Abstractions; | |
namespace Akka.Cluster.Tests.Routing; | |
public static class PoolConfig | |
{ | |
public static Config Get(string systemName, int seedPort, int port) => $$""" | |
akka | |
{ | |
actor | |
{ | |
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" | |
deployment | |
{ | |
/echo | |
{ | |
router = broadcast-pool | |
cluster | |
{ | |
enabled = on | |
max-nr-of-instances-per-node = 2 | |
max-total-nr-of-instances = 6 | |
allow-local-routees = on | |
use-role = Upload | |
} | |
} | |
} | |
} | |
remote | |
{ | |
dot-netty.tcp | |
{ | |
public-hostname = localhost | |
hostname = localhost | |
port = {{port}} | |
} | |
} | |
cluster | |
{ | |
roles = ["Upload"] | |
seed-nodes = [ "akka.tcp://{{systemName}}@localhost:{{seedPort}}" ] | |
min-nr-of-members = 3 | |
} | |
} | |
"""; | |
} | |
sealed class EchoActor: UntypedActor | |
{ | |
private readonly UniqueAddress _selfAddress; | |
private readonly ILoggingAdapter _log; | |
private readonly IActorRef _testActor; | |
public EchoActor(IActorRef testActor) | |
{ | |
_selfAddress = Cluster.Get(Context.System).SelfUniqueAddress; | |
_log = Context.GetLogger(); | |
_testActor = testActor; | |
} | |
protected override void OnReceive(object message) | |
{ | |
switch (message) | |
{ | |
case "ping": | |
_log.Info("Received ping on {0} from {1}", _selfAddress, Sender); | |
_testActor.Tell("pong"); | |
break; | |
default: | |
Unhandled(message); | |
break; | |
} | |
} | |
} | |
public class ClusterBroadcastPool1: TestKit.Xunit2.TestKit | |
{ | |
private const string SystemName = "cluster-system-1"; | |
private const int SeedPort = 5000; | |
private readonly ITestOutputHelper _out; | |
private readonly Cluster _cluster; | |
private readonly ActorSystem _system2; | |
private readonly ActorSystem _system3; | |
public ClusterBroadcastPool1(ITestOutputHelper output) : base(PoolConfig.Get(SystemName, SeedPort, SeedPort), SystemName, output) | |
{ | |
_out = output; | |
_system2 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 1)); | |
_system3 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 2)); | |
_cluster = Cluster.Get(Sys); | |
InitializeLogger(_system2, "[SYS-2]"); | |
InitializeLogger(_system3, "[SYS-3]"); | |
} | |
[Fact] | |
public async Task BroadcastPoolTest_ClusterFromTestKit() | |
{ | |
try | |
{ | |
var tcs = new TaskCompletionSource<Done>(); | |
_cluster.RegisterOnMemberUp(() => { tcs.SetResult(Done.Instance); }); | |
await tcs.Task; | |
var propsWithRouter = Props.Create(() => new EchoActor(TestActor)).WithRouter(FromConfig.Instance); | |
var pool = Sys.ActorOf(propsWithRouter, "echo"); | |
// wait until cluster pool stabilizes | |
await Task.Delay(1000); | |
pool.Tell("ping"); | |
await ExpectMsgAsync("pong"); | |
await ExpectMsgAsync("pong"); | |
await ExpectMsgAsync("pong"); | |
await ExpectMsgAsync("pong"); | |
await ExpectMsgAsync("pong"); | |
await ExpectMsgAsync("pong"); | |
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); | |
} | |
finally | |
{ | |
await Task.WhenAll( | |
_system2.Terminate(), | |
_system3.Terminate()); | |
} | |
} | |
} | |
public class ClusterBroadcastPool2: TestKit.Xunit2.TestKit | |
{ | |
private const string SystemName = "cluster-system-2"; | |
private const int SeedPort = 5010; | |
private readonly ITestOutputHelper _out; | |
private readonly Cluster _cluster; | |
private readonly ActorSystem _system1; | |
private readonly ActorSystem _system2; | |
private readonly ActorSystem _system3; | |
public ClusterBroadcastPool2(ITestOutputHelper output) : base(PoolConfig.Get(SystemName, SeedPort, SeedPort), SystemName, output) | |
{ | |
_out = output; | |
_system1 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 1)); | |
_system2 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 2)); | |
_system3 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 3)); | |
_cluster = Cluster.Get(_system1); | |
InitializeLogger(_system1, "[SYS-1]"); | |
InitializeLogger(_system2, "[SYS-2]"); | |
InitializeLogger(_system3, "[SYS-3]"); | |
} | |
[Fact] | |
public async Task BroadcastPoolTest_ClusterFromTestKit() | |
{ | |
try | |
{ | |
var tcs = new TaskCompletionSource<Done>(); | |
_cluster.RegisterOnMemberUp(() => { tcs.SetResult(Done.Instance); }); | |
await tcs.Task; | |
var probe = CreateTestProbe(); | |
var propsWithRouter = Props.Create(() => new EchoActor(probe)).WithRouter(FromConfig.Instance); | |
var pool = _system1.ActorOf(propsWithRouter, "echo"); | |
// wait until cluster pool stabilizes | |
await Task.Delay(1000); | |
pool.Tell("ping"); | |
await probe.ExpectMsgAsync("pong"); | |
await probe.ExpectMsgAsync("pong"); | |
// await probe.ExpectMsgAsync("pong"); | |
// await probe.ExpectMsgAsync("pong"); | |
// await probe.ExpectMsgAsync("pong"); | |
// await probe.ExpectMsgAsync("pong"); | |
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); | |
} | |
finally | |
{ | |
await Task.WhenAll( | |
_system1.Terminate(), | |
_system2.Terminate(), | |
_system3.Terminate()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment