Created
February 28, 2021 18:51
-
-
Save krasowskir/7193b7e7c4715cac2d6431a41c7cb4bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package servicetests | |
import com.example.kafkaFirstConsumer.DemoApplication | |
import com.example.kafkaFirstConsumer.service.MyAckConsumer | |
import org.apache.kafka.clients.consumer.Consumer | |
import org.apache.kafka.clients.consumer.ConsumerConfig | |
import org.apache.kafka.clients.producer.ProducerConfig | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import org.apache.kafka.common.TopicPartition | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.kafka.common.serialization.StringSerializer | |
import org.junit.ClassRule | |
import org.springframework.boot.test.context.SpringBootTest | |
import org.springframework.boot.test.util.TestPropertyValues | |
import org.springframework.context.ApplicationContextInitializer | |
import org.springframework.context.ConfigurableApplicationContext | |
import org.springframework.kafka.core.ConsumerFactory | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory | |
import org.springframework.kafka.core.KafkaTemplate | |
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer | |
import org.springframework.kafka.listener.ContainerProperties | |
import org.springframework.kafka.test.utils.KafkaTestUtils | |
import org.springframework.test.context.ContextConfiguration | |
import org.testcontainers.containers.KafkaContainer | |
import org.testcontainers.spock.Testcontainers | |
import org.testcontainers.utility.DockerImageName | |
import spock.lang.Shared | |
import spock.lang.Specification | |
@Testcontainers | |
@ContextConfiguration(initializers = [Initializer.class]) | |
@SpringBootTest(classes = [DemoApplication.class]) | |
class ConsumeEventsSpec extends Specification { | |
String topic = 'test-Topic' | |
ConcurrentMessageListenerContainer<String, String> container1 | |
ConcurrentMessageListenerContainer<String, String> container2 | |
@Shared | |
@ClassRule | |
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:5.3.0')) | |
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> { | |
void initialize(ConfigurableApplicationContext configurableApplicationContext) { | |
TestPropertyValues.of( | |
'spring.kafka.bootstrap-servers=' + kafka.getBootstrapServers()) | |
.applyTo(configurableApplicationContext.getEnvironment()) | |
} | |
} | |
def setupSpec() { | |
kafka.start() | |
} | |
def 'test order consumer is able to consume messages'() { | |
given: 'a kafka template' | |
def configs = new HashMap(KafkaTestUtils.producerProps(kafka.getBootstrapServers())) | |
def factory = new DefaultKafkaProducerFactory<String, String>(configs, new StringSerializer(), new StringSerializer()) | |
def template = new KafkaTemplate<String, String>(factory, true) | |
and: 'two producer records' | |
ProducerRecord<String, String> record = new ProducerRecord<>(topic, '1', 'Test 123') | |
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, '2', 'Test 123') | |
and: 'two consumers' | |
Map<String, Object> props = setBasicProperties() | |
def testCons1 = createMyConsumer1(props as Properties, consumerFactory(props as Properties)) | |
def testCons2 = createMyConsumer2(props as Properties, consumerFactory(props as Properties)) | |
when: 'sending a message to kafka' | |
Thread.sleep(5000) | |
template.send(record).get() | |
template.send(record).get() | |
template.send(record2).get() | |
template.send(record2).get() | |
template.send(record).get() | |
then: 'the message is consumed and acknowledged successfully' | |
KafkaTestUtils.getEndOffsets(testCons1 as Consumer<String, String>, topic, 0).get(new TopicPartition(topic, 0)) == 2 | |
KafkaTestUtils.getEndOffsets(testCons2 as Consumer<String, String>, topic, 1).get(new TopicPartition(topic, 1)) == 3 | |
} | |
Map<String, String> setBasicProperties() { | |
HashMap.of( | |
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), | |
ConsumerConfig.GROUP_INSTANCE_ID_DOC, 'rich1', | |
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest' | |
) | |
} | |
ConsumerFactory consumerFactory(Properties props) { | |
return new DefaultKafkaConsumerFactory<>(props as Map<String, String>, new StringDeserializer(), new StringDeserializer()) | |
} | |
Consumer<String, String> createMyConsumer1(Properties props, ConsumerFactory consumerFactory) { | |
container1 = new ConcurrentMessageListenerContainer(consumerFactory, containerProperties('gruppe-1', topic)) | |
container1.setConcurrency(1) | |
container1.start() | |
return consumerFactory.createConsumer('richSuffix') | |
} | |
Consumer<String, String> createMyConsumer2(Properties props, ConsumerFactory consumerFactory) { | |
container2 = new ConcurrentMessageListenerContainer(consumerFactory, containerProperties('gruppe-2', topic)) | |
container2.setConcurrency(1) | |
container2.start() | |
return consumerFactory.createConsumer('richSuffix2') | |
} | |
ContainerProperties containerProperties(String group, String topic) { | |
def consumer1 = new MyAckConsumer() | |
ContainerProperties containerProps = new ContainerProperties(topic) | |
containerProps.messageListener = consumer1 | |
containerProps.ackMode = ContainerProperties.AckMode.MANUAL | |
containerProps.groupId = group | |
return containerProps | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.kafkaFirstConsumer; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.kafka.annotation.EnableKafka; | |
@EnableKafka | |
@SpringBootApplication | |
public class DemoApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(DemoApplication.class, args); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.kafkaFirstConsumer.service; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.kafka.listener.AcknowledgingMessageListener; | |
import org.springframework.kafka.support.Acknowledgment; | |
public class MyAckConsumer implements AcknowledgingMessageListener<String, String> { | |
private static final Logger LOGGER = LoggerFactory.getLogger(MyAckConsumer.class); | |
@Override | |
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) { | |
System.out.println(consumerRecord.toString()); | |
LOGGER.info(consumerRecord.toString()); | |
acknowledgment.acknowledge(); | |
} | |
@Override | |
public void onMessage(ConsumerRecord<String, String> data) { | |
LOGGER.info(data.toString()); | |
System.out.println(data.toString()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.kafkaFirstConsumer.config; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.config.TopicBuilder; | |
@Configuration | |
public class MyConfig { | |
@Bean | |
public NewTopic topicExample() { | |
return TopicBuilder.name("test-Topic") | |
.partitions(2) | |
.replicas(1) | |
.build(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment