Skip to content

Instantly share code, notes, and snippets.

@MrMikeFloyd
Created December 15, 2021 12:40
Show Gist options
  • Save MrMikeFloyd/64b2970ada9f4bbba3552d09754e4c72 to your computer and use it in GitHub Desktop.
Save MrMikeFloyd/64b2970ada9f4bbba3552d09754e4c72 to your computer and use it in GitHub Desktop.
Function declaration that contains all the logic to split and aggregate the probe telemetry data. Find the most recent version here: https://github.com/codecentric/spring-kafka-streams-example/blob/main/kafka-samples-streams/src/main/kotlin/com/example/kafkasamplesstreams/KafkaStreamsHandler.kt
package com.example.kafkasamplesstreams
import com.example.kafkasamplesstreams.events.AggregatedTelemetryData
import com.example.kafkasamplesstreams.events.SpaceAgency
import com.example.kafkasamplesstreams.events.TelemetryDataPoint
import com.example.kafkasamplesstreams.serdes.AggregateTelemetryDataSerde
import mu.KotlinLogging
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Predicate
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class KafkaStreamsHandler {
private val logger = KotlinLogging.logger {}
@Bean
fun aggregateTelemetryData(): java.util.function.Function<
KStream<String, TelemetryDataPoint>,
Array<KStream<String, AggregatedTelemetryData>>> {
return java.util.function.Function<
KStream<String, TelemetryDataPoint>,
Array<KStream<String, AggregatedTelemetryData>>> { telemetryRecords ->
telemetryRecords.branch(
// Split up the processing pipeline into 2 streams, depending on the space agency of the probe
Predicate { _, v -> v.spaceAgency == SpaceAgency.NASA },
Predicate { _, v -> v.spaceAgency == SpaceAgency.ESA }
).map { telemetryRecordsPerAgency ->
// Apply aggregation logic on each stream separately
telemetryRecordsPerAgency
.groupByKey()
.aggregate(
// KTable initializer
{ AggregatedTelemetryData(maxSpeedMph = 0.0, traveledDistanceFeet = 0.0) },
// Calculation function for telemetry data aggregation
{ probeId, lastTelemetryReading, aggregatedTelemetryData ->
updateTotals(
probeId,
lastTelemetryReading,
aggregatedTelemetryData
)
},
// Configure Serdes for State Store topic
Materialized.with(Serdes.StringSerde(), AggregateTelemetryDataSerde())
)
.toStream()
}.toTypedArray()
}
}
/**
* Performs calculation of per-probe aggregate measurement data.
* The currently calculated totals are held in a Kafka State Store
* backing the KTable created with aggregate() and the most recently
* created aggregate telemetry data record is passed on downstream.
*/
fun updateTotals(
probeId: String,
lastTelemetryReading: TelemetryDataPoint,
currentAggregatedValue: AggregatedTelemetryData
): AggregatedTelemetryData {
val totalDistanceTraveled =
lastTelemetryReading.traveledDistanceFeet + currentAggregatedValue.traveledDistanceFeet
val maxSpeed = if (lastTelemetryReading.currentSpeedMph > currentAggregatedValue.maxSpeedMph)
lastTelemetryReading.currentSpeedMph else currentAggregatedValue.maxSpeedMph
val aggregatedTelemetryData = AggregatedTelemetryData(
traveledDistanceFeet = totalDistanceTraveled,
maxSpeedMph = maxSpeed
)
logger.info {
"Calculated new aggregated telemetry data for probe $probeId. New max speed: ${aggregatedTelemetryData.maxSpeedMph} and " +
"traveled distance ${aggregatedTelemetryData.traveledDistanceFeet}"
}
return aggregatedTelemetryData
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment