|
package me.rafaavila |
|
|
|
import cats.effect._ |
|
import cats.implicits._ |
|
import com.whisk.docker.impl.spotify.DockerKitSpotify |
|
import com.whisk.docker.scalatest.DockerTestKit |
|
import org.scalatest._ |
|
import org.scalatest.time._ |
|
import fs2.kafka._ |
|
import fs2._ |
|
import org.apache.kafka.clients.consumer.ConsumerRecord |
|
|
|
import scala.concurrent.ExecutionContext |
|
import scala.concurrent.duration._ |
|
|
|
|
|
class Probando extends FlatSpec with Matchers with DockerKafkaService with DockerTestKit with DockerKitSpotify { |
|
|
|
implicit val pc = PatienceConfig(Span(20, Seconds), Span(1, Second)) |
|
|
|
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) |
|
|
|
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) |
|
|
|
def processRecord(record: ConsumerRecord[String, String]): IO[(String, String)] = |
|
IO.pure(record.key -> record.value) |
|
|
|
"kafka container" should "be ready" in { |
|
isContainerReady(kafkaContainer).futureValue shouldBe true |
|
} |
|
|
|
"kafka container" should "working correctly" in { |
|
|
|
val consumerSettings = |
|
ConsumerSettings[String, String] |
|
.withAutoOffsetReset(AutoOffsetReset.Earliest) |
|
.withBootstrapServers("localhost:9092") |
|
.withGroupId("group") |
|
|
|
val producerSettings = |
|
ProducerSettings[String, String] |
|
.withBootstrapServers("localhost:9092") |
|
|
|
val toProduce = (0 until 100).map(n => s"key-$n" -> s"value->$n") |
|
|
|
val produced = |
|
(for { |
|
producer <- producerStream[IO].using(producerSettings) |
|
_ <- Stream.eval(IO(producer.toString should startWith("KafkaProducer$"))) |
|
message <- Stream.chunk(Chunk.seq(toProduce).map { |
|
case passthrough @ (key, value) => |
|
ProducerMessage.one(ProducerRecord("topic", key, value), passthrough) |
|
}) |
|
batched <- Stream.eval(producer.producePassthrough(message)).buffer(toProduce.size) |
|
passthrough <- Stream.eval(batched) |
|
} yield passthrough).compile.toVector.unsafeRunSync() |
|
|
|
produced should contain theSameElementsAs toProduce |
|
info(s"produced: ${produced.mkString(",")}") |
|
|
|
Thread.sleep(10000) |
|
|
|
val consumed = |
|
consumerStream[IO] |
|
.using(consumerSettings) |
|
.evalTap(_.subscribeTo("topic")) |
|
.evalTap(consumer => IO(consumer.toString should startWith("KafkaConsumer$")).void) |
|
.flatMap(_.stream) |
|
.take(produced.size.toLong) |
|
.map(message => message.record.key -> message.record.value) |
|
.compile |
|
.toVector |
|
.unsafeRunSync |
|
|
|
consumed should contain theSameElementsAs produced |
|
info(s"consumed: ${consumed.mkString(",")}") |
|
} |
|
} |