Created
October 19, 2020 16:48
-
-
Save gAmUssA/69d4d6da2cf06b5015b3514f14a8e210 to your computer and use it in GitHub Desktop.
Kafka Streams interactive questy + Spring webflux
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.confluent.developer.iqrest; | |
import org.apache.kafka.common.serialization.Serde; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StoreQueryParameters; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.kstream.Consumed; | |
import org.apache.kafka.streams.kstream.Materialized; | |
import org.apache.kafka.streams.state.QueryableStoreTypes; | |
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.context.event.ContextRefreshedEvent; | |
import org.springframework.context.event.EventListener; | |
import org.springframework.kafka.annotation.EnableKafkaStreams; | |
import org.springframework.kafka.config.StreamsBuilderFactoryBean; | |
import org.springframework.stereotype.Component; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.PathVariable; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import java.util.Objects; | |
import reactor.core.publisher.Mono; | |
@SpringBootApplication | |
@EnableKafkaStreams | |
public class IqRestApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(IqRestApplication.class, args); | |
} | |
} | |
@Component | |
class MyTopology { | |
@Autowired | |
public void createOrdersMaterializedView(final StreamsBuilder builder) { | |
final Serde<String> string = Serdes.String(); | |
builder.table("orders", Consumed.with(string, string), Materialized.as("orders_store")); | |
} | |
} | |
@RestController() | |
class MyIQController { | |
protected KafkaStreams kafkaStreams; | |
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean; | |
MyIQController(StreamsBuilderFactoryBean streamsBuilderFactoryBean) { | |
Objects.requireNonNull(streamsBuilderFactoryBean); | |
this.streamsBuilderFactoryBean = streamsBuilderFactoryBean; | |
kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams(); | |
} | |
@EventListener(ContextRefreshedEvent.class) | |
public void refreshKafkaStreams() { | |
this.kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams(); | |
} | |
@GetMapping("/iq/{id}") | |
public String getValue(@PathVariable final String id) { | |
ReadOnlyKeyValueStore<String, String> keyValueStore = kafkaStreams.store( | |
StoreQueryParameters.fromNameAndType("orders_store", QueryableStoreTypes.keyValueStore())); | |
return keyValueStore.get(id); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment