Last active
March 23, 2022 11:16
-
-
Save asmaier/6465468 to your computer and use it in GitHub Desktop.
Simple java junit test of an apache kafka producer (works with Kafka 0.11.0.2) (see also https://github.com/asmaier/mini-kafka)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.nio.charset.StandardCharsets; | |
import java.nio.file.Files; | |
import java.util.Arrays; | |
import java.util.Iterator; | |
import java.util.Properties; | |
import org.I0Itec.zkclient.ZkClient; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.junit.Test; | |
import kafka.admin.AdminUtils; | |
import kafka.admin.RackAwareMode; | |
import kafka.server.KafkaConfig; | |
import kafka.server.KafkaServer; | |
import kafka.utils.MockTime; | |
import kafka.utils.TestUtils; | |
import org.apache.kafka.common.utils.Time; | |
import kafka.utils.ZKStringSerializer$; | |
import kafka.utils.ZkUtils; | |
import kafka.zk.EmbeddedZookeeper; | |
import static org.junit.Assert.*; | |
/** | |
* For online documentation | |
* see | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/admin/AdminUtils.scala | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/utils/ZkUtils.scala | |
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html | |
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html | |
*/ | |
public class KafkaProducerIT { | |
private static final String ZKHOST = "127.0.0.1"; | |
private static final String BROKERHOST = "127.0.0.1"; | |
private static final String BROKERPORT = "9092"; | |
private static final String TOPIC = "test"; | |
@Test | |
public void producerTest() throws InterruptedException, IOException { | |
// setup Zookeeper | |
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); | |
String zkConnect = ZKHOST + ":" + zkServer.port(); | |
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); | |
ZkUtils zkUtils = ZkUtils.apply(zkClient, false); | |
// setup Broker | |
Properties brokerProps = new Properties(); | |
brokerProps.setProperty("zookeeper.connect", zkConnect); | |
brokerProps.setProperty("broker.id", "0"); | |
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); | |
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT); | |
brokerProps.setProperty("offsets.topic.replication.factor" , "1"); | |
KafkaConfig config = new KafkaConfig(brokerProps); | |
Time mock = new MockTime(); | |
KafkaServer kafkaServer = TestUtils.createServer(config, mock); | |
// create topic | |
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); | |
// setup producer | |
Properties producerProps = new Properties(); | |
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); | |
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); | |
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); | |
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps); | |
// setup consumer | |
Properties consumerProps = new Properties(); | |
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); | |
consumerProps.setProperty("group.id", "group0"); | |
consumerProps.setProperty("client.id", "consumer0"); | |
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); | |
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); | |
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic | |
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); | |
consumer.subscribe(Arrays.asList(TOPIC)); | |
// send message | |
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8)); | |
producer.send(data); | |
producer.close(); | |
// starting consumer | |
ConsumerRecords<Integer, byte[]> records = consumer.poll(5000); | |
assertEquals(1, records.count()); | |
Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); | |
ConsumerRecord<Integer, byte[]> record = recordIterator.next(); | |
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); | |
assertEquals(42, (int) record.key()); | |
assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); | |
kafkaServer.shutdown(); | |
zkClient.close(); | |
zkServer.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Gradle and Kotlin example based on this implementation.
gradle.properties
dependencies section:KafkaEmbedded.kt
Usage example: