Last active
February 3, 2026 20:21
-
-
Save dacr/7ac2120bd219fcf9de7b0b2c8455faa4 to your computer and use it in GitHub Desktop.
simple kafka stream example / published by https://github.com/dacr/code-examples-manager #348873e7-6a27-4c07-8ccb-8ccfb2548bef/76988a06236a3abddbe662b693040ad2611f3ad4
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // summary : simple kafka stream example | |
| // keywords : kafka, kafkastream, dsl, streams | |
| // publish : gist | |
| // authors : David Crosson | |
| // license : Apache License Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0.txt) | |
| // id : 348873e7-6a27-4c07-8ccb-8ccfb2548bef | |
| // created-on : 2020-04-24T16:56:45Z | |
| // managed-by : https://github.com/dacr/code-examples-manager | |
| // run-with : cs launch --scala 2.13 com.lihaoyi:::ammonite:2.4.0 -M ammonite.Main -- $file | |
| import $ivy.`org.apache.kafka::kafka-streams-scala:2.8.0` | |
| import $ivy.`org.apache.logging.log4j:log4j-api:2.14.1` | |
| import $ivy.`org.apache.logging.log4j:log4j-core:2.14.1` | |
| import $ivy.`org.apache.logging.log4j:log4j-slf4j-impl:2.14.1` | |
| import java.util.Properties | |
| import scala.util.Properties.envOrElse | |
| import java.time.Duration | |
| import org.apache.kafka.streams.scala.StreamsBuilder | |
| import org.apache.kafka.streams.scala.kstream._ | |
| import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} | |
| import org.apache.kafka.streams.scala.serialization.Serdes._ | |
| import org.apache.kafka.streams.scala.ImplicitConversions._ | |
| import org.apache.kafka.clients.consumer.ConsumerConfig._ | |
| org.apache.logging.log4j.core.config.Configurator.setRootLevel(org.apache.logging.log4j.Level.WARN) | |
| /* | |
| topics --create --replication-factor 1 --partitions 1 --topic input-topic | |
| topics --create --replication-factor 1 --partitions 1 --topic output-topic | |
| echo 'Hello' | console-producer --topic input-topic | |
| console-consumer --topic output-topic --group myconsumer | |
| */ | |
| val kafkaConfig: Properties = { | |
| val properties = new Properties() | |
| properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-simple") | |
| properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envOrElse("KAFKA_REMOTE_BROKER", "127.0.0.1:9092")) | |
| properties.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") | |
| properties | |
| } | |
| def processRecord(key:String, record:String):String = { | |
| println(s"** received $record with key $key") | |
| "<"+record+">" | |
| } | |
| @main | |
| def main():Unit = { | |
| val builder = new StreamsBuilder | |
| val eventsStream: KStream[String, String] = builder.stream[String, String]("input-topic") | |
| eventsStream | |
| .map { case (key, record) => key -> processRecord(key, record) } | |
| .to("output-topic") | |
| val streams: KafkaStreams = new KafkaStreams(builder.build(), kafkaConfig) | |
| streams.start() | |
| sys.ShutdownHookThread(streams.close(Duration.ofSeconds(10))) | |
| scala.io.StdIn.readLine("Enter to exit...") // required when run as a script as there is an implicit exit on ammonite script ends | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment