Simple solution to use Alpakka Kafka connector to produce and consume kafka messages.
I assume that you have 2 scala apps, a producer and a consumer.
Add the following dependencies:
"com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
Create an application.conf
:
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
# parallelism = 100
# How long to wait for `KafkaProducer.close`
# close-timeout = 60s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
# use-dispatcher = "akka.kafka.default-dispatcher"
# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
# eos-commit-interval = 100ms
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "quickstart.cloudera:9092"
}
}
Create a producer.scala
:
package fakeProducer
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system: ActorSystem = ActorSystem("producer-sample")
implicit val materializer: Materializer = ActorMaterializer()
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
val done: Future[Done] =
Source(1 to 100)
.map(value => new ProducerRecord[String, String]("test-topic", "msg " + value))
.runWith(Producer.plainSink(producerSettings))
implicit val ec: ExecutionContextExecutor = system.dispatcher
done onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
}
Add the following dependencies:
"com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
Create an application.conf
file:
# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
# Tuning property of scheduled polls.
# poll-interval = 50ms
# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that the thread that
# is executing the stage will be blocked.
# poll-timeout = 50ms
# The stage will await outstanding offset commit requests before
# shutting down, but if that takes longer than this timeout it will
# stop forcefully.
# stop-timeout = 30s
# How long to wait for `KafkaConsumer.close`
# close-timeout = 20s
# If offset commit requests are not completed within this timeout
# the returned Future is completed `CommitTimeoutException`.
# commit-timeout = 15s
# If commits take longer than this time a warning is logged
# commit-time-warning = 1s
# If for any reason `KafkaConsumer.poll` blocks for longer than the configured
# poll-timeout then it is forcefully woken up with `KafkaConsumer.wakeup`.
# The KafkaConsumerActor will throw
# `org.apache.kafka.common.errors.WakeupException` which will be ignored
# until `max-wakeups` limit gets exceeded.
# wakeup-timeout = 60s
# After exceeding maxinum wakeups the consumer will stop and the stage and fail.
# Setting it to 0 will let it ignore the wakeups and try to get the polling done forever.
# max-wakeups = 10
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
# commit-refresh-interval = infinite
# If enabled, log stack traces before waking up the KafkaConsumer to give
# some indication why the KafkaConsumer is not honouring the `poll-timeout`
#wakeup-debug = true
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
# use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# auto-commit disabled by default
# Setting enable.auto.commit means that offsets are committed automatically
# with a frequency controlled by the config auto.commit.interval.ms.
enable.auto.commit = true
bootstrap.servers = "quickstart.cloudera:9092"
group.id = "test-group1"
auto.offset.reset = "earliest"
}
# Time to wait for pending requests when a partition is closed
# wait-close-partition = 500ms
}
Create a consumer.scala
file:
package fakeConsumer
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object App {
def main(args: Array[String]): Unit = {
println("Hello from hBaseWriter")
implicit val system: ActorSystem = ActorSystem("consumer-sample")
implicit val materializer: Materializer = ActorMaterializer()
val consumerSettings =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
val done = Consumer
.plainSource(consumerSettings, Subscriptions.topics("test-topic"))
.runWith(Sink.foreach(println)) // just print each message for debugging
implicit val ec: ExecutionContextExecutor = system.dispatcher
done onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
}
To correctly handle Ctrl+C
or if your app is hosted inside a container and you want to gracefully shutdown you can use the following code:
// retrieve the control object
val control = val done = Consumer
.plainSource(consumerSettings, Subscriptions.topics("test-topic"))
.toMat(Sink.foreach(println))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
// Correctly handle Ctrl+C and docker container stop
sys.addShutdownHook({
println("Shutdown requested...")
val done = control.shutdown()
implicit val ec: ExecutionContextExecutor = system.dispatcher
done
.onComplete {
case Success(_) => logger.info("Exiting ...")
case Failure(err) => logger.error("Error", err)
}
})
@vinyoliver
No, sorry...