-
-
Save 7c00/41a3e4098015e753a6ef4e7e7be2df0e to your computer and use it in GitHub Desktop.
Simple java junit test of an apache kafka producer (works with Kafka 0.11.0.2) (see also https://github.com/asmaier/mini-kafka)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.nio.charset.StandardCharsets; | |
import java.nio.file.Files; | |
import java.util.Arrays; | |
import java.util.Iterator; | |
import java.util.Properties; | |
import org.I0Itec.zkclient.ZkClient; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.junit.Test; | |
import kafka.admin.AdminUtils; | |
import kafka.admin.RackAwareMode; | |
import kafka.server.KafkaConfig; | |
import kafka.server.KafkaServer; | |
import kafka.utils.MockTime; | |
import kafka.utils.TestUtils; | |
import org.apache.kafka.common.utils.Time; | |
import kafka.utils.ZKStringSerializer$; | |
import kafka.utils.ZkUtils; | |
import kafka.zk.EmbeddedZookeeper; | |
import static org.junit.Assert.*; | |
/** | |
* For online documentation | |
* see | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/admin/AdminUtils.scala | |
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/utils/ZkUtils.scala | |
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html | |
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html | |
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html | |
*/ | |
public class KafkaProducerIT { | |
private static final String ZKHOST = "127.0.0.1"; | |
private static final String BROKERHOST = "127.0.0.1"; | |
private static final String BROKERPORT = "9092"; | |
private static final String TOPIC = "test"; | |
@Test | |
public void producerTest() throws InterruptedException, IOException { | |
// setup Zookeeper | |
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); | |
String zkConnect = ZKHOST + ":" + zkServer.port(); | |
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); | |
ZkUtils zkUtils = ZkUtils.apply(zkClient, false); | |
// setup Broker | |
Properties brokerProps = new Properties(); | |
brokerProps.setProperty("zookeeper.connect", zkConnect); | |
brokerProps.setProperty("broker.id", "0"); | |
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); | |
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT); | |
brokerProps.setProperty("offsets.topic.replication.factor" , "1"); | |
KafkaConfig config = new KafkaConfig(brokerProps); | |
Time mock = new MockTime(); | |
KafkaServer kafkaServer = TestUtils.createServer(config, mock); | |
// create topic | |
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); | |
// setup producer | |
Properties producerProps = new Properties(); | |
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); | |
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); | |
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); | |
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps); | |
// setup consumer | |
Properties consumerProps = new Properties(); | |
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); | |
consumerProps.setProperty("group.id", "group0"); | |
consumerProps.setProperty("client.id", "consumer0"); | |
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); | |
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); | |
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic | |
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); | |
consumer.subscribe(Arrays.asList(TOPIC)); | |
// send message | |
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8)); | |
producer.send(data); | |
producer.close(); | |
// starting consumer | |
ConsumerRecords<Integer, byte[]> records = consumer.poll(5000); | |
assertEquals(1, records.count()); | |
Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); | |
ConsumerRecord<Integer, byte[]> record = recordIterator.next(); | |
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); | |
assertEquals(42, (int) record.key()); | |
assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); | |
kafkaServer.shutdown(); | |
zkClient.close(); | |
zkServer.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment