This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public KStream<?, ?> kstreamAggregation(StreamsBuilder streamsBuilder) { | |
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde())); | |
stream | |
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value)) | |
//re-partition our data on a new key | |
.groupBy((key, value) -> eventKey(value), Grouped.with(Serdes.String(), iotEventSerde())) | |
// add our incoming newValue into the aggregate | |
.aggregate(IotEventMetric::new, | |
(key, newValue, aggregate) -> aggregate.add(newValue), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public KStream<?, ?> windowedAggregation(StreamsBuilder streamsBuilder) { | |
WindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(new StringSerializer()); | |
TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer()); | |
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer); | |
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), new JsonSerde<>(IotEvent.class))); | |
stream | |
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value)) | |
//re-partition our data on a new key |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public KStream<?, ?> kstreamProcessorApi(StreamsBuilder streamsBuilder) { | |
StoreBuilder<KeyValueStore<String, IotEvent>> eventStoreBuilder = | |
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("eventstore"), | |
Serdes.String(), iotEventSerde()); | |
// register store | |
streamsBuilder.addStateStore(eventStoreBuilder); | |
KStream<String, IotEvent> stream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde())); | |
stream | |
.process(() -> new Processor<String, IotEvent>() { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public KStream<?, ?> kstreamTombstone(StreamsBuilder streamsBuilder) { | |
KStream<String, IotEvent> inbound = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), iotEventSerde())); | |
inbound | |
.peek((key, value) -> LOG.debug("incoming message: {} {}", key, value)) | |
//check for a null value (tombstone) and set delete flag if necessary | |
.mapValues((readOnlyKey, value) -> { | |
if (value == null) { | |
value = new IotEvent(); | |
value.setDelete(true); |