Skip to content

Instantly share code, notes, and snippets.

@narma
Created March 8, 2021 17:09
Show Gist options
  • Save narma/b63b5ec99d0b722f7658efd7111f09c0 to your computer and use it in GitHub Desktop.
Save narma/b63b5ec99d0b722f7658efd7111f09c0 to your computer and use it in GitHub Desktop.
testM("should close old stream during rebalancing under load") {
val nrMessages = 40000
val nrPartitions = 3
import zio.stream.Stream
def run(instance: Int, topic: String, allAssigments: Ref[Map[Int, Set[Int]]]) = {
val subscription = Subscription.topics(topic)
Consumer
.subscribeAnd(subscription)
.partitionedStream(Serde.string, Serde.string)
.map {
case (tp, partStream) =>
println(s"INST $instance START CONSUMING $tp")
Stream.fromEffect(allAssigments.update({ current =>
current.updatedWith(instance) {
case Some(currentSet) =>
println(s"INST $instance ADD PART ${tp.partition()}")
Some(currentSet ++ Set(tp.partition()))
case None =>
println(s"INST $instance INIT PART ${tp.partition()}")
Some(Set(tp.partition()))
}
})) ++ partStream.mapM(_ => ZIO.sleep(10.millis)) ++ Stream.fromEffect(allAssigments.update({ current =>
current.updatedWith(instance) {
case Some(currentSet) =>
println(s"INST $instance REMOVE PART ${tp.partition()}")
Some(currentSet -- Set(tp.partition()))
case None =>
println(s"INST $instance PART NONE WITH ${tp.partition()}")
None
}
}))
}
.flattenParUnbounded()
.runDrain
}
case class ValidAssignmentsNotSeen(st: String) extends RuntimeException(s"valid assignment not seen: $st")
val waitTimeout = 8.seconds
def checkAssignments(allAssignments: Ref[Map[Int, Set[Int]]])(instances: Set[Int]) =
for {
_ <- Stream
.repeatEffectWith(allAssignments.get, Schedule.spaced(30.millis))
.filter { state =>
// println(s"STATE = $state")
state.keySet == instances &&
instances.forall(instance => state.get(instance).exists(_.nonEmpty)) && state.values.toList
.flatMap(_.toList)
.sorted == List(0, 1, 2)
}
.runHead
.timeoutFail(ValidAssignmentsNotSeen(allAssignments.unsafeGet.toString))(waitTimeout)
} yield ()
val produced = (0 until nrMessages).map(n => s"key-$n" -> s"value->$n")
for {
// Produce messages on several partitions
topic <- randomTopic
group <- randomGroup
_ <- Task.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions))
_ <- produceMany(topic, kvs = produced)
allAssignments <- Ref.make(Map.empty[Int, Set[Int]])
check = checkAssignments(allAssignments)(_)
fiber0 <- run(0, topic, allAssignments)
.provideSomeLayer[Kafka with Blocking with Clock](
consumer(
group,
"client1",
offsetRetrieval = OffsetRetrieval.Auto(reset = AutoOffsetStrategy.Earliest)
)
)
.fork
_ <- check(Set(0))
_ <- Task(println("CHECK(0) done"))
fiber1 <- run(1, topic, allAssignments)
.provideSomeLayer[Kafka with Blocking with Clock](
consumer(
group,
"client1",
offsetRetrieval = OffsetRetrieval.Auto(reset = AutoOffsetStrategy.Earliest)
)
)
.fork
_ <- Task(println("CHECK(0, 1)"))
_ <- check(Set(0, 1))
_ <- Task(println("CHECK(0, 1) done"))
fiber2 <- run(2, topic, allAssignments)
.provideSomeLayer[Kafka with Blocking with Clock](
consumer(
group,
"client1",
offsetRetrieval = OffsetRetrieval.Auto(reset = AutoOffsetStrategy.Earliest),
)
)
.fork
_ <- Task(println("CHECK(0, 1, 2)"))
_ <- check(Set(0, 1, 2))
_ <- Task(println("CHECK(0, 1, 2) done"))
_ <- fiber2.interrupt
_ <- allAssignments.update(_ - 2)
_ <- check(Set(0, 1))
_ <- fiber1.interrupt
_ <- allAssignments.update(_ - 1)
_ <- check(Set(0))
_ <- fiber0.interrupt
} yield assertCompletes
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment