Created
December 4, 2020 13:34
-
-
Save surysharma/7ffa66b2129b1d589c84cd30cd603359 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
logging: | |
pattern: | |
console: "[Kafka Pattern] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}" | |
spring: | |
main: | |
log-startup-info: false | |
kafka: | |
listener: | |
missing-topics-fatal: false | |
producer: | |
key-serializer: org.apache.kafka.common.serialization.StringSerializer | |
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer | |
bootstrap-servers: localhost:9092 | |
input-lowercase-topic: t.lower.case | |
output-uppercase-topic: t.upper.case |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.thebigscale.kstreamsample.config; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.core.env.Environment; | |
import org.springframework.kafka.annotation.EnableKafkaStreams; | |
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; | |
import org.springframework.kafka.config.KafkaStreamsConfiguration; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Optional; | |
@Configuration | |
@EnableKafkaStreams | |
public class KafkaStreamConfiguration { | |
public static final String APP_ID = "upper-case-demo"; | |
private final KafkaProperties kafkaProperties; | |
private final String inputTopic; | |
private final String outputTopic; | |
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) | |
public KafkaStreamsConfiguration getStreamsConfig() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); | |
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); | |
return new KafkaStreamsConfiguration(props); | |
} | |
@Bean | |
public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); } | |
@Bean | |
public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); } | |
public KafkaStreamConfiguration(KafkaProperties kafkaProperties, Environment env) { | |
this.kafkaProperties = kafkaProperties; | |
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic"); | |
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
public class KStreamSampleApplication implements CommandLineRunner { | |
public static void main(String[] args) { | |
SpringApplication.run(KStreamSampleApplication.class, args); | |
} | |
@Override | |
public void run(String... args) { | |
System.out.println("Started the KStream spring boot CLI..."); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.thebigscale.kstreamsample; | |
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.junit.jupiter.api.Assertions; | |
import org.junit.jupiter.api.DisplayName; | |
import org.junit.jupiter.api.Test; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.core.env.Environment; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | |
import org.springframework.kafka.test.context.EmbeddedKafka; | |
import org.springframework.kafka.test.utils.KafkaTestUtils; | |
import java.time.Duration; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
import static org.assertj.core.api.Assertions.assertThat; | |
@SpringBootTest | |
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) | |
class KStreamSampleApplicationTests { | |
private final KafkaProperties kafkaProperties; | |
private final String inputTopic; | |
private final String outputTopic; | |
@Autowired | |
public KStreamSampleApplicationTests(KafkaProperties kafkaProperties, Environment env) { | |
this.kafkaProperties = kafkaProperties; | |
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic"); | |
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic"); | |
} | |
@Test | |
@DisplayName("should test uppercaseStream topology") | |
void shouldTestUppercaseStreamTopology() { | |
//Given | |
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps( | |
String.join(",", kafkaProperties.getBootstrapServers()))); | |
//Create a kafka producer | |
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps, new StringSerializer(), new StringSerializer()).createProducer(); | |
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(String.join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true"); | |
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
//Create a Consumer client | |
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()).createConsumer(); | |
consumer.subscribe(Collections.singleton(outputTopic)); | |
//When | |
producer.send(new ProducerRecord<>(inputTopic, "test")); | |
producer.flush(); | |
//Then | |
assertThat(producer).isNotNull(); | |
//And | |
ConsumerRecords<String, String> rec = consumer.poll(Duration.ofSeconds(3)); | |
Iterable<ConsumerRecord<String, String>> records = rec.records(outputTopic); | |
Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); | |
if (!iterator.hasNext()) Assertions.fail(); | |
ConsumerRecord<String, String> next = iterator.next(); | |
assertThat(next.value()).isEqualTo("TEST"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.thebigscale.kstreamsample.processors; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.Topology; | |
import org.apache.kafka.streams.kstream.*; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.core.env.Environment; | |
@Configuration | |
public class UppercaseTopologyProcessor { | |
private final String inputTopic; | |
private final String outputTopic; | |
UppercaseTopologyProcessor(Environment env) { | |
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic"); | |
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic"); | |
} | |
@Bean | |
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) { | |
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); | |
sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream in getTopology...")); | |
KStream<String, String> upperCaseStream = sourceStream.mapValues((ValueMapper<String, String>) String::toUpperCase); | |
upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream...")); | |
upperCaseStream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); | |
Topology topology = builder.build(); | |
System.out.println(topology.describe()); | |
return upperCaseStream; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment