Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save hveiga/77f98c6d0070150e76c6 to your computer and use it in GitHub Desktop.
Save hveiga/77f98c6d0070150e76c6 to your computer and use it in GitHub Desktop.
Not working test
/**
* Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.MultiNodeClusterSpec.EndActor
object UnreachableNodeJoinsAgainSecondMultiNodeConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(ConfigFactory.parseString(
"""
akka.remote.log-remote-lifecycle-events = off
akka.remote.transport-failure-detector.heartbeat-interval = 30s
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 20s
akka.cluster.gossip-interval = 3s
akka.cluster.unreachable-nodes-reaper-interval = 15s
akka.cluster.auto-down-unreachable-after = off
akka.cluster.seed-node-timeout = 100s
akka.cluster.allow-weakly-up-members = on
akka.cluster.failure-detector.acceptable-heartbeat-pause = 20s
akka.cluster.failure-detector.threshold = 18
akka.cluster.failure-detector.heartbeat-interval = 10s
akka.cluster.failure-detector.expected-response-after = 20s
akka.cluster.failure-detector.monitored-by-nr-of-members = 3
""").withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)))
testTransport(on = true)
}
class UnreachableNodeJoinsAgainSecondMultiJvmNode1 extends UnreachableNodeJoinsAgainSecondSpec
class UnreachableNodeJoinsAgainSecondMultiJvmNode2 extends UnreachableNodeJoinsAgainSecondSpec
class UnreachableNodeJoinsAgainSecondMultiJvmNode3 extends UnreachableNodeJoinsAgainSecondSpec
class UnreachableNodeJoinsAgainSecondMultiJvmNode4 extends UnreachableNodeJoinsAgainSecondSpec
abstract class UnreachableNodeJoinsAgainSecondSpec
extends MultiNodeSpec(UnreachableNodeJoinsAgainSecondMultiNodeConfig)
with MultiNodeClusterSpec {
import UnreachableNodeJoinsAgainMultiNodeConfig._
muteMarkingAsUnreachable()
def allBut(role: RoleName, roles: immutable.Seq[RoleName] = roles): immutable.Seq[RoleName] = {
roles.filterNot(_ == role)
}
lazy val master = second
lazy val victim = fourth
var endBarrierNumber = 0
def endBarrier(): Unit = {
endBarrierNumber += 1
enterBarrier("after_" + endBarrierNumber)
}
"A cluster of " + roles.size + " members" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
endBarrier()
}
"mark a node as UNREACHABLE when we pull the network and be REACHABLE after" taggedAs LongRunningTest in {
// let them send at least one heartbeat to each other after the gossip convergence
// because for new joining nodes we remove them from the failure detector when
// receive gossip
Thread.sleep(2.seconds.dilated.toMillis)
runOn(first) {
// pull network for victim node from all nodes
allBut(victim).foreach { roleName ⇒
testConductor.blackhole(victim, roleName, Direction.Both).await
}
}
enterBarrier("unplug_victim")
val allButVictim = allBut(victim, roles)
runOn(victim) {
allButVictim.foreach(markNodeAsUnavailable(_))
within(150 seconds) {
// victim becomes all alone
awaitAssert {
val members = clusterView.members
clusterView.unreachableMembers.size should ===(roles.size - 1)
}
clusterView.unreachableMembers.map(_.address) should ===((allButVictim map address).toSet)
}
}
runOn(allButVictim: _*) {
markNodeAsUnavailable(victim)
within(150 seconds) {
// victim becomes unreachable
awaitAssert {
val members = clusterView.members
clusterView.unreachableMembers.size should ===(1)
}
awaitSeenSameState(allButVictim map address: _*)
// still one unreachable
clusterView.unreachableMembers.size should ===(1)
clusterView.unreachableMembers.head.address should ===(node(victim).address)
clusterView.unreachableMembers.head.status should ===(MemberStatus.Up)
}
}
endBarrier()
runOn(first) {
// pull network for victim node from all nodes
allBut(victim).foreach { roleName ⇒
testConductor.passThrough(victim, roleName, Direction.Both).await
}
}
enterBarrier("plug_victim")
runOn(allButVictim: _*) {
//markNodeAsUnavailable(victim)
within(150 seconds) {
// victim becomes unreachable
awaitAssert {
val members = clusterView.members
clusterView.unreachableMembers.size should ===(0)
}
awaitSeenSameState(allButVictim map address: _*)
// no unreachable
clusterView.unreachableMembers.size should ===(0)
}
}
endBarrier()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment