Skip to content

Instantly share code, notes, and snippets.

@lucasrpb
Created May 13, 2021 19:30
Show Gist options
  • Save lucasrpb/7b993a5bb56a59be871d5e429adf5475 to your computer and use it in GitHub Desktop.
Save lucasrpb/7b993a5bb56a59be871d5e429adf5475 to your computer and use it in GitHub Desktop.
//Akka Kafka Stream Consumer
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.kafka.{CommitDelivery, CommitterSettings, ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory
import java.util.UUID
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
class ConsumerSpec extends AnyFlatSpec {
val logger = LoggerFactory.getLogger(this.getClass)
"it " should " consume records successfully " in {
val system = ActorSystem.create()
implicit val provider = system.classicSystem
val config = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings = ConsumerSettings[String, Array[Byte]](system, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("g18")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withClientId("c18")
def business(key: String, value: Array[Byte]): Future[Done] = {
logger.info(s"\n${Console.GREEN_B}tuple: ${key} = ${new String(value)}${Console.RESET}\n")
Future.successful(Done.done())
}
val committerSettings = CommitterSettings(system).withDelivery(CommitDelivery.waitForAck)
val control =
Consumer
.committableSource(consumerSettings, Subscriptions.topics("demo"))
.mapAsync(10) { msg =>
business(msg.record.key, msg.record.value).map(_ => msg.committableOffset)
}
.via(Committer.flow(committerSettings.withMaxBatch(1)))
.toMat(Sink.seq)(DrainingControl.apply)
.run()
Await.result(control.streamCompletion.flatMap(_ => control.drainAndShutdown()), Duration.Inf)
}
}
//Akka Kafka Stream Producer
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.commons.lang3.RandomStringUtils
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.scalatest.flatspec.AnyFlatSpec
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
class ProducerSpec extends AnyFlatSpec {
"it " should " produce records successfully " in {
val system = ActorSystem.create()
implicit val provider = system.classicSystem
val config = system.settings.config.getConfig("akka.kafka.producer")
val bootstrapServers = "localhost:9092"
val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val topic = "demo"
val done: Future[Done] = Source(1 to 100)
.map(_ => RandomStringUtils.randomAlphabetic(3, 5))
.map(value => new ProducerRecord[String, String](topic, value))
.runWith(Producer.plainSink(producerSettings))
val result = Await.result(done, Duration.Inf)
println(result)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment