Created
December 15, 2021 12:33
-
-
Save MrMikeFloyd/4eb3b2da6061c4a849442702744e7257 to your computer and use it in GitHub Desktop.
StreamBridge that publishes the generated probe data to the associated Kafka Topic. Find the most recent version here: https://github.com/codecentric/spring-kafka-streams-example/blob/main/kafka-samples-producer/src/main/kotlin/de/codecentric/samples/kafkasamplesproducer/TelemetryDataStreamBridge.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package de.codecentric.samples.kafkasamplesproducer | |
import de.codecentric.samples.kafkasamplesproducer.event.TelemetryData | |
import mu.KotlinLogging | |
import org.springframework.beans.factory.annotation.Autowired | |
import org.springframework.cloud.stream.function.StreamBridge | |
import org.springframework.kafka.support.KafkaHeaders | |
import org.springframework.messaging.support.MessageBuilder | |
import org.springframework.stereotype.Component | |
@Component | |
class TelemetryDataStreamBridge(@Autowired val streamBridge: StreamBridge) { | |
private val logger = KotlinLogging.logger {} | |
fun send(telemetryData: TelemetryData) { | |
val kafkaMessage = MessageBuilder | |
.withPayload(telemetryData) | |
// Make sure all messages for a given probe go to the same partition to ensure proper ordering | |
.setHeader(KafkaHeaders.MESSAGE_KEY, telemetryData.probeId) | |
.build() | |
logger.info { "Publishing space probe telemetry data: Payload: '${kafkaMessage.payload}'" } | |
streamBridge.send("telemetry-data-out-0", kafkaMessage) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment