Last active
April 2, 2023 10:10
-
-
Save dacr/ea357226917aad56ede30e3ef36271ba to your computer and use it in GitHub Desktop.
Kafka features test using embedded kafka. / published by https://github.com/dacr/code-examples-manager #ce57d71f-05b2-461d-82b7-c634fc73d80c/387a3a2fceeb5f73245eb883de0e05fd2e09e346
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Kafka features test using embedded kafka. | |
// keywords : scala, scalatest, kafka, embedded-kafka | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : ce57d71f-05b2-461d-82b7-c634fc73d80c | |
// created-on : 2018-10-10T08:45:12+02:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : cs launch --scala 2.13 com.lihaoyi:::ammonite:2.4.0 -M ammonite.Main -- $file | |
import $ivy.`org.scalatest::scalatest:3.2.9` | |
import $ivy.`io.github.embeddedkafka::embedded-kafka:2.8.0` | |
import $ivy.`org.apache.logging.log4j:log4j-api:2.14.1` | |
import $ivy.`org.apache.logging.log4j:log4j-core:2.14.1` | |
import $ivy.`org.apache.logging.log4j:log4j-slf4j-impl:2.14.1` | |
import org.scalatest._, flatspec._, matchers._, OptionValues._ | |
import io.github.embeddedkafka._ | |
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer} | |
org.apache.logging.log4j.core.config.Configurator.setRootLevel(org.apache.logging.log4j.Level.ERROR) | |
class KafkaFeaturesTest extends AnyFlatSpec with should.Matchers with EmbeddedKafka { | |
override val suiteName = "KafkaFeaturesTest" | |
// ------------------------------------------------------------------------------------------------------------------- | |
"Kafka embedded" should "be able to dynamically choose an available port" in { | |
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) | |
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig => | |
info(s"Kafka is listening on port ${actualConfig.kafkaPort} for testing purposes") | |
implicit val serializer = new StringSerializer | |
implicit val deserializer = new StringDeserializer | |
publishToKafka("news", "Hello world !") | |
consumeFirstMessageFrom("news") should include regex "(?i)hello" | |
} | |
} | |
// ------------------------------------------------------------------------------------------------------------------- | |
it should "be able to create customized configuration" in { | |
val size = (20*1024*1024).toString | |
val brokerConfig = Map( | |
"min.insync.replicas" -> "1", | |
"replica.fetch.max.bytes" -> size, | |
"message.max.bytes" -> size | |
) | |
val producerConfig = Map( | |
"max.request.size" -> size | |
) | |
val consumerConfig = Map( | |
"max.partition.fetch.bytes" -> size | |
) | |
val userDefinedConfig = EmbeddedKafkaConfig( | |
kafkaPort = 0, | |
zooKeeperPort = 0, | |
customBrokerProperties = brokerConfig, | |
customProducerProperties = producerConfig, | |
customConsumerProperties = consumerConfig | |
) | |
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig => | |
val topicName = "truc" | |
val topicConfig = Map( | |
"retention.ms" -> "31536000000" // One Year | |
) | |
createCustomTopic( | |
topic = topicName, | |
topicConfig = topicConfig, | |
partitions = 1, | |
replicationFactor = 1 | |
) | |
implicit val serializer = new StringSerializer | |
implicit val deserializer = new StringDeserializer | |
val msg = ("12345678" * (20 * 1024 * 1024 / 16)) // a 20Mb messages | |
publishToKafka(topicName, msg) | |
consumeFirstMessageFrom(topicName) should include regex "(?i)12345678" | |
} | |
} | |
// ------------------------------------------------------------------------------------------------------------------- | |
it should "be possible to directly use the java API" in { | |
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) | |
implicit val serializer = new StringSerializer | |
implicit val deserializer = new StringDeserializer | |
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig => | |
val port = actualConfig.kafkaPort | |
// TO BE CONTINUED | |
} | |
} | |
} | |
org.scalatest.tools.Runner.main(Array("-oDF", "-s", classOf[KafkaFeaturesTest].getName)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment