Created
March 8, 2021 17:09
-
-
Save narma/b63b5ec99d0b722f7658efd7111f09c0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
testM("should close old stream during rebalancing under load") { | |
val nrMessages = 40000 | |
val nrPartitions = 3 | |
import zio.stream.Stream | |
def run(instance: Int, topic: String, allAssigments: Ref[Map[Int, Set[Int]]]) = { | |
val subscription = Subscription.topics(topic) | |
Consumer | |
.subscribeAnd(subscription) | |
.partitionedStream(Serde.string, Serde.string) | |
.map { | |
case (tp, partStream) => | |
println(s"INST $instance START CONSUMING $tp") | |
Stream.fromEffect(allAssigments.update({ current => | |
current.updatedWith(instance) { | |
case Some(currentSet) => | |
println(s"INST $instance ADD PART ${tp.partition()}") | |
Some(currentSet ++ Set(tp.partition())) | |
case None => | |
println(s"INST $instance INIT PART ${tp.partition()}") | |
Some(Set(tp.partition())) | |
} | |
})) ++ partStream.mapM(_ => ZIO.sleep(10.millis)) ++ Stream.fromEffect(allAssigments.update({ current => | |
current.updatedWith(instance) { | |
case Some(currentSet) => | |
println(s"INST $instance REMOVE PART ${tp.partition()}") | |
Some(currentSet -- Set(tp.partition())) | |
case None => | |
println(s"INST $instance PART NONE WITH ${tp.partition()}") | |
None | |
} | |
})) | |
} | |
.flattenParUnbounded() | |
.runDrain | |
} | |
case class ValidAssignmentsNotSeen(st: String) extends RuntimeException(s"valid assignment not seen: $st") | |
val waitTimeout = 8.seconds | |
def checkAssignments(allAssignments: Ref[Map[Int, Set[Int]]])(instances: Set[Int]) = | |
for { | |
_ <- Stream | |
.repeatEffectWith(allAssignments.get, Schedule.spaced(30.millis)) | |
.filter { state => | |
// println(s"STATE = $state") | |
state.keySet == instances && | |
instances.forall(instance => state.get(instance).exists(_.nonEmpty)) && state.values.toList | |
.flatMap(_.toList) | |
.sorted == List(0, 1, 2) | |
} | |
.runHead | |
.timeoutFail(ValidAssignmentsNotSeen(allAssignments.unsafeGet.toString))(waitTimeout) | |
} yield () | |
val produced = (0 until nrMessages).map(n => s"key-$n" -> s"value->$n") | |
for { | |
// Produce messages on several partitions | |
topic <- randomTopic | |
group <- randomGroup | |
_ <- Task.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) | |
_ <- produceMany(topic, kvs = produced) | |
allAssignments <- Ref.make(Map.empty[Int, Set[Int]]) | |
check = checkAssignments(allAssignments)(_) | |
fiber0 <- run(0, topic, allAssignments) | |
.provideSomeLayer[Kafka with Blocking with Clock]( | |
consumer( | |
group, | |
"client1", | |
offsetRetrieval = OffsetRetrieval.Auto(reset = AutoOffsetStrategy.Earliest) | |
) | |
) | |
.fork | |
_ <- check(Set(0)) | |
_ <- Task(println("CHECK(0) done")) | |
fiber1 <- run(1, topic, allAssignments) | |
.provideSomeLayer[Kafka with Blocking with Clock]( | |
consumer( | |
group, | |
"client1", | |
offsetRetrieval = OffsetRetrieval.Auto(reset = AutoOffsetStrategy.Earliest) | |
) | |
) | |
.fork | |
_ <- Task(println("CHECK(0, 1)")) | |
_ <- check(Set(0, 1)) | |
_ <- Task(println("CHECK(0, 1) done")) | |
fiber2 <- run(2, topic, allAssignments) | |
.provideSomeLayer[Kafka with Blocking with Clock]( | |
consumer( | |
group, | |
"client1", | |
offsetRetrieval = OffsetRetrieval.Auto(reset = AutoOffsetStrategy.Earliest), | |
) | |
) | |
.fork | |
_ <- Task(println("CHECK(0, 1, 2)")) | |
_ <- check(Set(0, 1, 2)) | |
_ <- Task(println("CHECK(0, 1, 2) done")) | |
_ <- fiber2.interrupt | |
_ <- allAssignments.update(_ - 2) | |
_ <- check(Set(0, 1)) | |
_ <- fiber1.interrupt | |
_ <- allAssignments.update(_ - 1) | |
_ <- check(Set(0)) | |
_ <- fiber0.interrupt | |
} yield assertCompletes | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment