Last active
March 23, 2022 11:16
-
-
Save asmaier/6465468 to your computer and use it in GitHub Desktop.
Simple java junit test of an apache kafka producer (works with Kafka 0.11.0.2) (see also https://github.com/asmaier/mini-kafka)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.nio.charset.StandardCharsets; | |
import java.nio.file.Files; | |
import java.util.Arrays; | |
import java.util.Iterator; | |
import java.util.Properties; | |
import org.I0Itec.zkclient.ZkClient; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.junit.Test; | |
import kafka.admin.AdminUtils; | |
import kafka.admin.RackAwareMode; | |
import kafka.server.KafkaConfig; | |
import kafka.server.KafkaServer; | |
import kafka.utils.MockTime; | |
import kafka.utils.TestUtils; | |
import org.apache.kafka.common.utils.Time; | |
import kafka.utils.ZKStringSerializer$; | |
import kafka.utils.ZkUtils; | |
import kafka.zk.EmbeddedZookeeper; | |
import static org.junit.Assert.*; | |
/** | |
* For online documentation | |
* see | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/admin/AdminUtils.scala | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/utils/ZkUtils.scala | |
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html | |
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html | |
*/ | |
public class KafkaProducerIT { | |
private static final String ZKHOST = "127.0.0.1"; | |
private static final String BROKERHOST = "127.0.0.1"; | |
private static final String BROKERPORT = "9092"; | |
private static final String TOPIC = "test"; | |
@Test | |
public void producerTest() throws InterruptedException, IOException { | |
// setup Zookeeper | |
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); | |
String zkConnect = ZKHOST + ":" + zkServer.port(); | |
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); | |
ZkUtils zkUtils = ZkUtils.apply(zkClient, false); | |
// setup Broker | |
Properties brokerProps = new Properties(); | |
brokerProps.setProperty("zookeeper.connect", zkConnect); | |
brokerProps.setProperty("broker.id", "0"); | |
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); | |
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT); | |
brokerProps.setProperty("offsets.topic.replication.factor" , "1"); | |
KafkaConfig config = new KafkaConfig(brokerProps); | |
Time mock = new MockTime(); | |
KafkaServer kafkaServer = TestUtils.createServer(config, mock); | |
// create topic | |
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); | |
// setup producer | |
Properties producerProps = new Properties(); | |
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); | |
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); | |
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); | |
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps); | |
// setup consumer | |
Properties consumerProps = new Properties(); | |
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); | |
consumerProps.setProperty("group.id", "group0"); | |
consumerProps.setProperty("client.id", "consumer0"); | |
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); | |
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); | |
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic | |
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); | |
consumer.subscribe(Arrays.asList(TOPIC)); | |
// send message | |
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8)); | |
producer.send(data); | |
producer.close(); | |
// starting consumer | |
ConsumerRecords<Integer, byte[]> records = consumer.poll(5000); | |
assertEquals(1, records.count()); | |
Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); | |
ConsumerRecord<Integer, byte[]> record = recordIterator.next(); | |
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); | |
assertEquals(42, (int) record.key()); | |
assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); | |
kafkaServer.shutdown(); | |
zkClient.close(); | |
zkServer.shutdown(); | |
} | |
} |
hi @asmaier, thank you so much for this. i have used this successfully in the past. does the code work for kafka 0.10.1.0
?
recently I have been getting
java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS
at kafka.server.Defaults$.<init>(KafkaConfig.scala:183)
at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
at kafka.log.Defaults$.<init>(LogConfig.scala:35)
at kafka.log.Defaults$.<clinit>(LogConfig.scala)
at kafka.log.LogConfig$.<init>(LogConfig.scala:246)
at kafka.log.LogConfig$.<clinit>(LogConfig.scala)
at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:270)
at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:795)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:797)
at com.ibm.whi.bap.helper.test.kafka.KafkaServerTest.<init>(KafkaServerTest.java:56)
at com.ibm.whi.bap.helper.test.kafka.KafkaTest.checkAllProperties(KafkaTest.java:115)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
java.lang.NullPointerException
at com.ibm.whi.bap.helper.test.kafka.KafkaTest.tearDown(KafkaTest.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:37)
at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
at
KafkaConfig config = new KafkaConfig(brokerProps);
I did not change anything else. Any idea what might be wrong? Here is my maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
I updated the example right now to work with Kafka 0.11.0.2. I also increased the poll timeout to 5000ms. As dependencies I use now
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies>
Gradle and Kotlin example based on this implementation.
gradle.properties
dependencies section:
ver = [junit: '4.12', kafka: '1.1.1']
testImplementation "junit:junit:$ver.junit"
implementation "org.apache.kafka:kafka-clients:$ver.kafka"
testImplementation "org.apache.kafka:kafka-clients:$ver.kafka:test"
testImplementation "org.apache.kafka:kafka_2.11:$ver.kafka"
testImplementation "org.apache.kafka:kafka_2.11:$ver.kafka:test"
KafkaEmbedded.kt
class KafkaEmbedded(port: Int, topic: String) : Closeable {
private val server: KafkaServer
private val zkClient: ZkClient
private val zkServer: EmbeddedZookeeper
init {
zkServer = EmbeddedZookeeper()
val zkConnect = "127.0.0.1:${zkServer.port()}"
val props = Properties()
props.setProperty("zookeeper.connect", zkConnect)
props.setProperty("broker.id", "0")
props.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString())
props.setProperty("listeners", "PLAINTEXT://127.0.0.1:$port")
props.setProperty("offsets.topic.replication.factor", "1")
server = KafkaServer(KafkaConfig(props), Time.SYSTEM, Option.apply("kafka-broker"), JavaConversions.asScalaBuffer(emptyList()))
server.startup()
zkClient = ZkClient(zkConnect, 30000, 30000, `ZKStringSerializer$`.`MODULE$`)
val zkUtils = ZkUtils.apply(zkClient, false)
AdminUtils.createTopic(zkUtils, topic, 1, 1, Properties(), RackAwareMode.`Disabled$`.`MODULE$`)
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(listOf(server)), topic, 0, 5000);
}
override fun close() {
server.shutdown()
server.awaitShutdown()
zkClient.close()
zkServer.shutdown()
}
}
Usage example:
KafkaEmbedded(12345, "test").use {
// produce-consume-assert
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am also getting same assertion error during
Same as @srinidhis94.
I tried what @an247 suggested by increasing poll timeout. But not solved my problem