Last active
October 20, 2021 19:47
-
-
Save benstopford/49555b2962f93f6d50e3 to your computer and use it in GitHub Desktop.
Kafka Testing at its Most Simple
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.confluent.benstopford; | |
import kafka.consumer.*; | |
import kafka.javaapi.consumer.ConsumerConnector; | |
import kafka.message.MessageAndMetadata; | |
import kafka.serializer.StringDecoder; | |
import kafka.server.KafkaConfig; | |
import kafka.server.KafkaServerStartable; | |
import org.apache.curator.test.TestingServer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import static org.hamcrest.core.Is.is; | |
import static org.junit.Assert.assertThat; | |
/** | |
* Kafka testing at its most simple. | |
* You'll need the following in your pom: | |
* | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka_2.10</artifactId> | |
<version>0.8.2.1</version> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.11</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.curator</groupId> | |
<artifactId>curator-test</artifactId> | |
<version>2.8.0</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
*/ | |
public class KafkaMostBasicTest { | |
public static final String topic = "topic1-" + System.currentTimeMillis(); | |
private KafkaTestFixture server; | |
private Producer producer; | |
private ConsumerConnector consumerConnector; | |
@Before | |
public void setup() throws Exception { | |
server = new KafkaTestFixture(); | |
server.start(serverProperties()); | |
} | |
@After | |
public void teardown() throws Exception { | |
producer.close(); | |
consumerConnector.shutdown(); | |
server.stop(); | |
} | |
@Test | |
public void shouldWriteThenRead() throws Exception { | |
//Create a consumer | |
ConsumerIterator<String, String> it = buildConsumer(KafkaMostBasicTest.topic); | |
//Create a producer | |
producer = new KafkaProducer(producerProps()); | |
//send a message | |
producer.send(new ProducerRecord(KafkaMostBasicTest.topic, "message")).get(); | |
//read it back | |
MessageAndMetadata<String, String> messageAndMetadata = it.next(); | |
String value = messageAndMetadata.message(); | |
assertThat(value, is("message")); | |
} | |
private ConsumerIterator<String, String> buildConsumer(String topic) { | |
Properties props = consumerProperties(); | |
Map<String, Integer> topicCountMap = new HashMap(); | |
topicCountMap.put(topic, 1); | |
ConsumerConfig consumerConfig = new ConsumerConfig(props); | |
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); | |
Map<String, List<KafkaStream<String, String>>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null)); | |
KafkaStream<String, String> stream = consumers.get(topic).get(0); | |
return stream.iterator(); | |
} | |
private Properties consumerProperties() { | |
Properties props = new Properties(); | |
props.put("zookeeper.connect", serverProperties().get("zookeeper.connect")); | |
props.put("group.id", "group1"); | |
props.put("auto.offset.reset", "smallest"); | |
return props; | |
} | |
private Properties producerProps() { | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("request.required.acks", "1"); | |
return props; | |
} | |
private Properties serverProperties() { | |
Properties props = new Properties(); | |
props.put("zookeeper.connect", "localhost:2181"); | |
props.put("broker.id", "1"); | |
return props; | |
} | |
private static class KafkaTestFixture { | |
private TestingServer zk; | |
private KafkaServerStartable kafka; | |
public void start(Properties properties) throws Exception { | |
Integer port = getZkPort(properties); | |
zk = new TestingServer(port); | |
zk.start(); | |
KafkaConfig kafkaConfig = new KafkaConfig(properties); | |
kafka = new KafkaServerStartable(kafkaConfig); | |
kafka.startup(); | |
} | |
public void stop() throws IOException { | |
kafka.shutdown(); | |
zk.stop(); | |
zk.close(); | |
} | |
private int getZkPort(Properties properties) { | |
String url = (String) properties.get("zookeeper.connect"); | |
String port = url.split(":")[1]; | |
return Integer.valueOf(port); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for posting this! It helped me a lot!