Last active
April 2, 2023 10:12
-
-
Save dacr/7ac2120bd219fcf9de7b0b2c8455faa4 to your computer and use it in GitHub Desktop.
simple kafka stream example / published by https://github.com/dacr/code-examples-manager #348873e7-6a27-4c07-8ccb-8ccfb2548bef/6cbe9a2caf4b530fb051232b74785f344c051d4c
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : simple kafka stream example | |
// keywords : kafka, kafkastream, dsl, streams | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 348873e7-6a27-4c07-8ccb-8ccfb2548bef | |
// created-on : 2020-04-24T16:56:45Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : cs launch --scala 2.13 com.lihaoyi:::ammonite:2.4.0 -M ammonite.Main -- $file | |
import $ivy.`org.apache.kafka::kafka-streams-scala:2.8.0` | |
import $ivy.`org.apache.logging.log4j:log4j-api:2.14.1` | |
import $ivy.`org.apache.logging.log4j:log4j-core:2.14.1` | |
import $ivy.`org.apache.logging.log4j:log4j-slf4j-impl:2.14.1` | |
import java.util.Properties | |
import scala.util.Properties.envOrElse | |
import java.time.Duration | |
import org.apache.kafka.streams.scala.StreamsBuilder | |
import org.apache.kafka.streams.scala.kstream._ | |
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} | |
import org.apache.kafka.streams.scala.serialization.Serdes._ | |
import org.apache.kafka.streams.scala.ImplicitConversions._ | |
import org.apache.kafka.clients.consumer.ConsumerConfig._ | |
org.apache.logging.log4j.core.config.Configurator.setRootLevel(org.apache.logging.log4j.Level.WARN) | |
/* | |
topics --create --replication-factor 1 --partitions 1 --topic input-topic | |
topics --create --replication-factor 1 --partitions 1 --topic output-topic | |
echo 'Hello' | console-producer --topic input-topic | |
console-consumer --topic output-topic --group myconsumer | |
*/ | |
val kafkaConfig: Properties = { | |
val properties = new Properties() | |
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-simple") | |
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envOrElse("KAFKA_REMOTE_BROKER", "127.0.0.1:9092")) | |
properties.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") | |
properties | |
} | |
def processRecord(key:String, record:String):String = { | |
println(s"** received $record with key $key") | |
"<"+record+">" | |
} | |
@main | |
def main():Unit = { | |
val builder = new StreamsBuilder | |
val eventsStream: KStream[String, String] = builder.stream[String, String]("input-topic") | |
eventsStream | |
.map { case (key, record) => key -> processRecord(key, record) } | |
.to("output-topic") | |
val streams: KafkaStreams = new KafkaStreams(builder.build(), kafkaConfig) | |
streams.start() | |
sys.ShutdownHookThread(streams.close(Duration.ofSeconds(10))) | |
scala.io.StdIn.readLine("Enter to exit...") // required when run as a script as there is an implicit exit on ammonite script ends | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment